Python SDK
Installation, configuration, and API reference for the Ledger Python SDK.
Installation
uv add ontopix-ledger
The package is published to Ontopix's private CodeArtifact registry. Configure your registry before installing — see your team's CodeArtifact setup guide.
Environment Variables
| Variable | Default | Description |
|---|---|---|
LEDGER_QUEUE_URL | (required outside sandbox) | SQS FIFO queue URL |
LEDGER_ENV | dev | Environment tag added to every event |
LEDGER_AWS_REGION | eu-central-1 | AWS region for SQS client |
LEDGER_SANDBOX_MODE | false | true writes to stdout instead of SQS |
LEDGER_LOG_LEVEL | INFO | Log level |
LEDGER_QUEUE_URL is validated at import time. If it is missing and sandbox mode is
off, the import raises ImportError immediately.
Sandbox Mode
Set LEDGER_SANDBOX_MODE=true for local development. Events are printed to stdout
as JSON — no SQS or AWS credentials required.
export LEDGER_SANDBOX_MODE=true
export LEDGER_ENV=dev
python my_script.py
API Reference
track() — synchronous
import ledger
ledger.track(
service="audit-service",
operation="transcript",
units=1,
unit_type="requests",
vendor="elevenlabs",
model="scribe_v2",
tenant_id=tenant_id,
job_id=job_id,
)
Signature:
def track(
*,
service: str,
operation: str,
units: float = 1.0,
unit_type: str,
timestamp: datetime | None = None,
idempotency_key: str | None = None,
**dimensions: object,
) -> None
- All parameters are keyword-only.
service,operation, andunit_typeare required.unitsdefaults to1.0.timestampdefaults todatetime.now(UTC).idempotency_keydefaults to a random UUID.- Any additional
**kwargsbecome event dimensions (e.g.vendor,tenant_id,model). Dimension values are coerced to strings automatically. - Never raises. Exceptions are caught and logged — a billing write failure will never propagate to your business logic.
- Returns
None.
atrack() — asynchronous
Identical signature to track(), but async. Uses aioboto3 under the hood.
import ledger
await ledger.atrack(
service="audit-service",
operation="transcript",
units=1,
unit_type="requests",
vendor="elevenlabs",
tenant_id=tenant_id,
)
Never raises, same as track().
@record — decorator
For functions where the entire execution is one billable unit.
import ledger
@ledger.record(
service="stats-service",
operation="aggregate",
unit_type="api_call",
dimensions_from=["tenant_id"],
)
async def aggregate_results(tenant_id: str, data: list) -> dict:
...
Signature:
def record(
service: str,
operation: str,
unit_type: str,
*,
units: float = 1.0,
dimensions_from: list[str] | None = None,
**static_dimensions: object,
) -> Callable
service,operation,unit_typeare positional-or-keyword, required.unitsdefaults to1.0.dimensions_from— a list of parameter names to extract from the decorated function's arguments and include as dimensions.- Any additional
**kwargsare passed as static dimensions on every call. - Auto-detects sync vs async: wraps sync functions with a sync wrapper and async functions with an async wrapper.
track()/atrack()is called after the function returns successfully.
recording() — sync context manager
Use when units are only known after the work completes.
import ledger
with ledger.recording(
service="audit-service",
operation="enrich",
unit_type="tokens",
vendor="openai",
tenant_id=tenant_id,
) as entry:
result = openai_client.chat.completions.create(...)
entry.units = result.usage.total_tokens
Signature:
def recording(
*,
service: str,
operation: str,
unit_type: str,
units: float = 0.0,
**dimensions: Any,
) -> RecordingContext
entry.unitsis a read/write property — set it before thewithblock exits.track()is called on successful exit only. If an exception propagates out of the block, no event is recorded.
arecording() — async context manager
Async equivalent of recording().
import ledger
async with ledger.arecording(
service="audit-service",
operation="enrich",
unit_type="tokens",
vendor="openai",
tenant_id=tenant_id,
) as entry:
result = await openai_client.chat.completions.create(...)
entry.units = result.usage.total_tokens
Same behaviour as recording() — calls atrack() on successful exit only.
End-to-End Example: Audit Pipeline
A pipeline that transcribes audio, enriches the transcript, audits it, and stores
results — all tracked per workspace_id.
import ledger
SERVICE = "audit-service"
async def run_pipeline(audio_url: str, workspace_id: str, job_id: str) -> dict:
# ── Step 1: Transcribe audio (ElevenLabs) ────────────────────────
# Cost is known upfront: 1 request. Use track().
transcript = await elevenlabs_client.transcribe(audio_url, model="scribe_v1")
await ledger.atrack(
service=SERVICE,
operation="transcribe",
units=1,
unit_type="requests",
vendor="elevenlabs",
model="scribe_v1",
workspace_id=workspace_id,
job_id=job_id,
)
# ── Step 2: Enrich transcript (OpenAI gpt-5-mini) ────────────────
# Each token type is a separate resource consumed at a different cost.
# Emit one event per unit_type so each can be queried and aggregated
# independently in Timestream (e.g. SUM(units) WHERE unit_type = 'input_tokens').
enrichment = await openai_client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": f"Enrich: {transcript.text}"}],
)
enrich_dims = dict(
service=SERVICE, operation="enrich", vendor="openai",
model="gpt-5-mini", workspace_id=workspace_id, job_id=job_id,
)
usage = enrichment.usage
await ledger.atrack(units=usage.prompt_tokens, unit_type="input_tokens", **enrich_dims)
await ledger.atrack(units=usage.completion_tokens, unit_type="output_tokens", **enrich_dims)
await ledger.atrack(
units=getattr(usage, "prompt_tokens_details", {}).get("cached_tokens", 0),
unit_type="input_cached_tokens", **enrich_dims,
)
# ── Step 3: Audit the enriched transcript (OpenAI gpt-5) ─────────
# Same pattern — one event per token type.
audit = await openai_client.chat.completions.create(
model="gpt-5",
messages=[{"role": "user", "content": f"Audit: {enrichment.choices[0].message.content}"}],
)
audit_dims = dict(
service=SERVICE, operation="audit", vendor="openai",
model="gpt-5", workspace_id=workspace_id, job_id=job_id,
)
usage = audit.usage
await ledger.atrack(units=usage.prompt_tokens, unit_type="input_tokens", **audit_dims)
await ledger.atrack(units=usage.completion_tokens, unit_type="output_tokens", **audit_dims)
await ledger.atrack(
units=getattr(usage, "prompt_tokens_details", {}).get("cached_tokens", 0),
unit_type="input_cached_tokens", **audit_dims,
)
# ── Step 4: Store results ─────────────────────────────────────────
# Cost is fixed: 1 write. The decorator handles tracking on success.
result = await store_audit_results(workspace_id, audit)
return result
@ledger.record(
service=SERVICE,
operation="aggregate",
unit_type="writes",
dimensions_from=["workspace_id"],
)
async def store_audit_results(workspace_id: str, audit_response) -> dict:
"""Write audit results to the database. Tracked automatically by @record."""
# ... db write logic ...
return {"status": "stored", "workspace_id": workspace_id}
This pipeline produces 8 Ledger events for the two LLM steps (3 token types each),
plus 1 for transcription and 1 for aggregation — 10 total per run. All share the same
workspace_id and job_id dimensions for attribution and traceability.
Idempotency
Every event carries an idempotency_key used as the SQS MessageDeduplicationId.
By default, a random UUID is generated per call — each call is treated as unique.
For deterministic deduplication (e.g. retries), provide your own key:
ledger.track(
service="audit-service",
operation="transcript",
units=1,
unit_type="requests",
idempotency_key=f"{job_id}:transcript",
vendor="elevenlabs",
job_id=job_id,
)
SQS FIFO deduplicates messages with the same key within a 5-minute window.
Reserved Dimension Names
The following keys cannot be used as dimension **kwargs:
service, operation, units, unit_type, timestamp, environment,
idempotency_key, schema_version.
Passing a reserved name raises ValueError (before the fire-and-forget swallow catches it).