IntegrationsPython SDK

Python SDK

Official Python client for the Hotdata HTTP API — typed, Pydantic-validated, and generated from the OpenAPI spec.

Install

pip install hotdata

For Apache Arrow result support (faster, more memory-efficient for large result sets):

pip install 'hotdata[arrow]'

Authentication

import hotdata

configuration = hotdata.Configuration(
    api_key="YOUR_API_KEY",
    workspace_id="YOUR_WORKSPACE_ID",
)

host defaults to https://api.hotdata.dev. Override it if you target another environment.

Quickstart

import hotdata

configuration = hotdata.Configuration(
    api_key="YOUR_API_KEY",
    workspace_id="YOUR_WORKSPACE_ID",
)

with hotdata.ApiClient(configuration) as api_client:
    query_api = hotdata.QueryApi(api_client)
    response = query_api.query(
        hotdata.QueryRequest(sql="SELECT 1 AS ok")
    )
    print(response)

Execute SQL

with hotdata.ApiClient(configuration) as api_client:
    query_api = hotdata.QueryApi(api_client)

    # Synchronous query
    response = query_api.query(
        hotdata.QueryRequest(sql="SELECT * FROM orders LIMIT 10")
    )

    # Async query — returns a query run ID for polling
    response = query_api.query(
        hotdata.QueryRequest(
            sql="SELECT * FROM large_table",
            var_async=True,
        )
    )
    run_id = response.query_run_id

    # Try sync first, fall back to async after 3 s
    response = query_api.query(
        hotdata.QueryRequest(
            sql="SELECT * FROM orders",
            var_async=True,
            async_after_ms=3000,
        )
    )

Scope a query to a specific managed database with x_database_id:

response = query_api.query(
    hotdata.QueryRequest(sql="SELECT * FROM default.public.orders LIMIT 5"),
    x_database_id="db_abc123",
)

Managed databases

with hotdata.ApiClient(configuration) as api_client:
    db_api = hotdata.DatabasesApi(api_client)

    # Create a database and declare tables
    created = db_api.create_database(
        hotdata.CreateDatabaseRequest(
            description="sales",
            expires_at="24h",
            schemas=[
                hotdata.DatabaseDefaultSchemaDecl(
                    name="public",
                    tables=[
                        hotdata.DatabaseDefaultTableDecl(name="orders"),
                        hotdata.DatabaseDefaultTableDecl(name="customers"),
                    ],
                )
            ],
        )
    )
    print(created.id)  # e.g. "db_abc123"

    # List all databases
    listing = db_api.list_databases()
    for db in listing.databases:
        print(db.id, db.description)

    # Get a specific database
    detail = db_api.get_database("db_abc123")
    print(detail.default_connection_id)

    # Delete a database
    db_api.delete_database("db_abc123")

Load parquet into a managed table

Upload a parquet file and load it into a declared table:

with hotdata.ApiClient(configuration) as api_client:
    uploads_api = hotdata.UploadsApi(api_client)
    connections_api = hotdata.ConnectionsApi(api_client)

    # Upload the file
    with open("orders.parquet", "rb") as f:
        upload = uploads_api.upload_file(
            f.read(),
            _content_type="application/octet-stream",
        )

    # Load into the declared table
    result = connections_api.load_managed_table(
        connection_id=detail.default_connection_id,
        schema="public",
        table="orders",
        load_managed_table_request=hotdata.LoadManagedTableRequest(
            mode="replace",
            upload_id=upload.id,
        ),
    )
    print(result.row_count, result.table_name)

Apache Arrow results

Fetch results as an Arrow table instead of JSON — faster and more memory-efficient for large result sets:

from hotdata import ApiClient, Configuration
from hotdata.arrow import ResultsApi

with ApiClient(configuration) as client:
    results = ResultsApi(client)

    # Buffered — returns a pyarrow.Table
    table = results.get_result_arrow(result_id)

    # Streaming — yields batches without materializing the full table
    with results.stream_result_arrow(result_id) as reader:
        for batch in reader:
            print(batch.to_pandas())

Both methods accept offset and limit for pagination. They raise hotdata.arrow.ResultNotReadyError if the result is still pending — poll results.get_result(result_id) until status == "ready" first.

Workspaces

with hotdata.ApiClient(configuration) as api_client:
    workspaces_api = hotdata.WorkspacesApi(api_client)

    listing = workspaces_api.list_workspaces()
    for ws in listing.workspaces:
        print(ws.id, ws.name)

Query run history

with hotdata.ApiClient(configuration) as api_client:
    runs_api = hotdata.QueryRunsApi(api_client)
    results_api = hotdata.ResultsApi(api_client)

    # List recent runs
    listing = runs_api.list_query_runs(limit=20)
    for run in listing.query_runs:
        print(run.id, run.status, run.execution_time_ms)

    # Fetch stored result rows
    result = results_api.get_result(run.result_id)

Error handling

from hotdata.rest import ApiException

try:
    response = query_api.query(hotdata.QueryRequest(sql="SELECT * FROM missing_table"))
except ApiException as e:
    print(f"API error {e.status}: {e.reason}")
    print(e.body)

API classes

ClassDescription
QueryApiExecute SQL queries
DatabasesApiCreate, list, and delete managed databases
ConnectionsApiManage connections and load managed tables
DatasetsApiCreate and manage datasets (CSV, JSON, parquet)
WorkspacesApiList and create workspaces
InformationSchemaApiList tables and columns
QueryRunsApiInspect query run history
ResultsApiRetrieve stored query results
UploadsApiUpload files for managed table loads
IndexesApiCreate and list indexes (BM25, vector)
SandboxesApiManage sandboxes
JobsApiMonitor background jobs
SecretsApiManage workspace secrets

See also