Integrationsdlt

dlt

Use dlt (data load tool) to build pipelines that load data from any source into Hotdata managed databases — with automatic schema inference, incremental loading, and parquet-based delivery.

Install

pip install hotdata-dlt

Authentication

Set HOTDATA_API_KEY in your environment. Optionally set HOTDATA_WORKSPACE to pin a specific workspace.

export HOTDATA_API_KEY="your_api_key"
export HOTDATA_WORKSPACE="ws_..."

Or pass credentials explicitly when configuring the destination.

Quickstart

import dlt
from hotdata_dlt import hotdata

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination=hotdata(),
    dataset_name="public",
)

data = [
    {"id": 1, "name": "Alice", "amount": 99.99},
    {"id": 2, "name": "Bob",   "amount": 49.50},
]

info = pipeline.run(data, table_name="customers")
print(info)

dlt infers the schema from your data, creates a managed database, and loads the records as parquet.

Configure the destination

from hotdata_dlt import hotdata

destination = hotdata(
    api_key="your_api_key",        # defaults to HOTDATA_API_KEY env var
    workspace_id="ws_...",         # defaults to HOTDATA_WORKSPACE env var
    database_name="sales",         # managed database name (default: pipeline name)
    schema="public",               # schema (default: dataset_name)
)

Load from a source

Use any dlt-verified source or a custom generator:

import dlt
from dlt.sources.sql_database import sql_database
from hotdata_dlt import hotdata

source = sql_database(
    credentials="postgresql://user:pass@host/db",
    schema="public",
    table_names=["orders", "customers"],
)

pipeline = dlt.pipeline(
    pipeline_name="postgres_to_hotdata",
    destination=hotdata(),
    dataset_name="public",
)

info = pipeline.run(source)
print(f"Loaded {info.loads_ids} into Hotdata")

Incremental loading

dlt tracks state between runs — only new or updated rows are loaded on subsequent executions:

import dlt
from hotdata_dlt import hotdata

@dlt.resource(primary_key="id", write_disposition="merge")
def events(
    updated_at=dlt.sources.incremental("updated_at")
):
    # fetch rows newer than updated_at.last_value
    yield fetch_events(since=updated_at.last_value)

pipeline = dlt.pipeline(
    pipeline_name="events_pipeline",
    destination=hotdata(),
    dataset_name="public",
)

pipeline.run(events())

Write dispositions:

DispositionBehaviour
appendAdd new rows to the table
replaceReplace the full table on each run
mergeUpsert rows matched by primary_key

Query loaded data

After the pipeline runs, query the table through Hotdata:

hotdata query \
  "SELECT name, amount FROM default.public.customers ORDER BY amount DESC" \
  --database sales

Or with the Python SDK:

from hotdata_runtime import from_env

client = from_env()
result = client.execute_sql(
    "SELECT * FROM default.public.customers LIMIT 10",
    database="sales",
)
print(result.rows)

See also