dlt
Use dlt (data load tool) to build pipelines that load data from any source into Hotdata managed databases — with automatic schema inference, incremental loading, and parquet-based delivery.
Install
pip install hotdata-dlt
Authentication
Set HOTDATA_API_KEY in your environment. Optionally set HOTDATA_WORKSPACE to pin a specific workspace.
export HOTDATA_API_KEY="your_api_key"
export HOTDATA_WORKSPACE="ws_..."
Or pass credentials explicitly when configuring the destination.
Quickstart
import dlt
from hotdata_dlt import hotdata
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination=hotdata(),
dataset_name="public",
)
data = [
{"id": 1, "name": "Alice", "amount": 99.99},
{"id": 2, "name": "Bob", "amount": 49.50},
]
info = pipeline.run(data, table_name="customers")
print(info)
dlt infers the schema from your data, creates a managed database, and loads the records as parquet.
Configure the destination
from hotdata_dlt import hotdata
destination = hotdata(
api_key="your_api_key", # defaults to HOTDATA_API_KEY env var
workspace_id="ws_...", # defaults to HOTDATA_WORKSPACE env var
database_name="sales", # managed database name (default: pipeline name)
schema="public", # schema (default: dataset_name)
)
Load from a source
Use any dlt-verified source or a custom generator:
import dlt
from dlt.sources.sql_database import sql_database
from hotdata_dlt import hotdata
source = sql_database(
credentials="postgresql://user:pass@host/db",
schema="public",
table_names=["orders", "customers"],
)
pipeline = dlt.pipeline(
pipeline_name="postgres_to_hotdata",
destination=hotdata(),
dataset_name="public",
)
info = pipeline.run(source)
print(f"Loaded {info.loads_ids} into Hotdata")
Incremental loading
dlt tracks state between runs — only new or updated rows are loaded on subsequent executions:
import dlt
from hotdata_dlt import hotdata
@dlt.resource(primary_key="id", write_disposition="merge")
def events(
updated_at=dlt.sources.incremental("updated_at")
):
# fetch rows newer than updated_at.last_value
yield fetch_events(since=updated_at.last_value)
pipeline = dlt.pipeline(
pipeline_name="events_pipeline",
destination=hotdata(),
dataset_name="public",
)
pipeline.run(events())
Write dispositions:
| Disposition | Behaviour |
|---|---|
append | Add new rows to the table |
replace | Replace the full table on each run |
merge | Upsert rows matched by primary_key |
Query loaded data
After the pipeline runs, query the table through Hotdata:
hotdata query \
"SELECT name, amount FROM default.public.customers ORDER BY amount DESC" \
--database sales
Or with the Python SDK:
from hotdata_runtime import from_env
client = from_env()
result = client.execute_sql(
"SELECT * FROM default.public.customers LIMIT 10",
database="sales",
)
print(result.rows)
See also
- hotdata-dlt on GitHub
- dlt documentation
- Python SDK — core
HotdataClientreference - Quick Start — CLI and workspace setup