Python SDK
Official Python client for the Hotdata HTTP API — typed, Pydantic-validated, and generated from the OpenAPI spec.
Install
pip install hotdata
For Apache Arrow result support (faster, more memory-efficient for large result sets):
pip install 'hotdata[arrow]'
Authentication
import hotdata
configuration = hotdata.Configuration(
api_key="YOUR_API_KEY",
workspace_id="YOUR_WORKSPACE_ID",
)
host defaults to https://api.hotdata.dev. Override it if you target another environment.
Quickstart
import hotdata
configuration = hotdata.Configuration(
api_key="YOUR_API_KEY",
workspace_id="YOUR_WORKSPACE_ID",
)
with hotdata.ApiClient(configuration) as api_client:
query_api = hotdata.QueryApi(api_client)
response = query_api.query(
hotdata.QueryRequest(sql="SELECT 1 AS ok")
)
print(response)
Execute SQL
with hotdata.ApiClient(configuration) as api_client:
query_api = hotdata.QueryApi(api_client)
# Synchronous query
response = query_api.query(
hotdata.QueryRequest(sql="SELECT * FROM orders LIMIT 10")
)
# Async query — returns a query run ID for polling
response = query_api.query(
hotdata.QueryRequest(
sql="SELECT * FROM large_table",
var_async=True,
)
)
run_id = response.query_run_id
# Try sync first, fall back to async after 3 s
response = query_api.query(
hotdata.QueryRequest(
sql="SELECT * FROM orders",
var_async=True,
async_after_ms=3000,
)
)
Scope a query to a specific managed database with x_database_id:
response = query_api.query(
hotdata.QueryRequest(sql="SELECT * FROM default.public.orders LIMIT 5"),
x_database_id="db_abc123",
)
Managed databases
with hotdata.ApiClient(configuration) as api_client:
db_api = hotdata.DatabasesApi(api_client)
# Create a database and declare tables
created = db_api.create_database(
hotdata.CreateDatabaseRequest(
description="sales",
expires_at="24h",
schemas=[
hotdata.DatabaseDefaultSchemaDecl(
name="public",
tables=[
hotdata.DatabaseDefaultTableDecl(name="orders"),
hotdata.DatabaseDefaultTableDecl(name="customers"),
],
)
],
)
)
print(created.id) # e.g. "db_abc123"
# List all databases
listing = db_api.list_databases()
for db in listing.databases:
print(db.id, db.description)
# Get a specific database
detail = db_api.get_database("db_abc123")
print(detail.default_connection_id)
# Delete a database
db_api.delete_database("db_abc123")
Load parquet into a managed table
Upload a parquet file and load it into a declared table:
with hotdata.ApiClient(configuration) as api_client:
uploads_api = hotdata.UploadsApi(api_client)
connections_api = hotdata.ConnectionsApi(api_client)
# Upload the file
with open("orders.parquet", "rb") as f:
upload = uploads_api.upload_file(
f.read(),
_content_type="application/octet-stream",
)
# Load into the declared table
result = connections_api.load_managed_table(
connection_id=detail.default_connection_id,
schema="public",
table="orders",
load_managed_table_request=hotdata.LoadManagedTableRequest(
mode="replace",
upload_id=upload.id,
),
)
print(result.row_count, result.table_name)
Apache Arrow results
Fetch results as an Arrow table instead of JSON — faster and more memory-efficient for large result sets:
from hotdata import ApiClient, Configuration
from hotdata.arrow import ResultsApi
with ApiClient(configuration) as client:
results = ResultsApi(client)
# Buffered — returns a pyarrow.Table
table = results.get_result_arrow(result_id)
# Streaming — yields batches without materializing the full table
with results.stream_result_arrow(result_id) as reader:
for batch in reader:
print(batch.to_pandas())
Both methods accept offset and limit for pagination. They raise hotdata.arrow.ResultNotReadyError if the result is still pending — poll results.get_result(result_id) until status == "ready" first.
Workspaces
with hotdata.ApiClient(configuration) as api_client:
workspaces_api = hotdata.WorkspacesApi(api_client)
listing = workspaces_api.list_workspaces()
for ws in listing.workspaces:
print(ws.id, ws.name)
Query run history
with hotdata.ApiClient(configuration) as api_client:
runs_api = hotdata.QueryRunsApi(api_client)
results_api = hotdata.ResultsApi(api_client)
# List recent runs
listing = runs_api.list_query_runs(limit=20)
for run in listing.query_runs:
print(run.id, run.status, run.execution_time_ms)
# Fetch stored result rows
result = results_api.get_result(run.result_id)
Error handling
from hotdata.rest import ApiException
try:
response = query_api.query(hotdata.QueryRequest(sql="SELECT * FROM missing_table"))
except ApiException as e:
print(f"API error {e.status}: {e.reason}")
print(e.body)
API classes
| Class | Description |
|---|---|
QueryApi | Execute SQL queries |
DatabasesApi | Create, list, and delete managed databases |
ConnectionsApi | Manage connections and load managed tables |
DatasetsApi | Create and manage datasets (CSV, JSON, parquet) |
WorkspacesApi | List and create workspaces |
InformationSchemaApi | List tables and columns |
QueryRunsApi | Inspect query run history |
ResultsApi | Retrieve stored query results |
UploadsApi | Upload files for managed table loads |
IndexesApi | Create and list indexes (BM25, vector) |
SandboxesApi | Manage sandboxes |
JobsApi | Monitor background jobs |
SecretsApi | Manage workspace secrets |
See also
- hotdata on PyPI
- sdk-python on GitHub
- API Reference — Full HTTP API documentation
- Quick Start — CLI and workspace setup