Sdk

Python SDK

Installation, configuration, and API reference for the Ledger Python SDK.

Installation

uv add ontopix-ledger

The package is published to Ontopix's private CodeArtifact registry. Configure your registry before installing — see your team's CodeArtifact setup guide.

Environment Variables

VariableDefaultDescription
LEDGER_QUEUE_URL(required outside sandbox)SQS FIFO queue URL
LEDGER_ENVdevEnvironment tag added to every event
LEDGER_AWS_REGIONeu-central-1AWS region for SQS client
LEDGER_SANDBOX_MODEfalsetrue writes to stdout instead of SQS
LEDGER_LOG_LEVELINFOLog level

LEDGER_QUEUE_URL is validated at import time. If it is missing and sandbox mode is off, the import raises ImportError immediately.

Sandbox Mode

Set LEDGER_SANDBOX_MODE=true for local development. Events are printed to stdout as JSON — no SQS or AWS credentials required.

export LEDGER_SANDBOX_MODE=true
export LEDGER_ENV=dev
python my_script.py

API Reference

track() — synchronous

import ledger

ledger.track(
    service="audit-service",
    operation="transcript",
    units=1,
    unit_type="requests",
    vendor="elevenlabs",
    model="scribe_v2",
    tenant_id=tenant_id,
    job_id=job_id,
)

Signature:

def track(
    *,
    service: str,
    operation: str,
    units: float = 1.0,
    unit_type: str,
    timestamp: datetime | None = None,
    idempotency_key: str | None = None,
    **dimensions: object,
) -> None
  • All parameters are keyword-only.
  • service, operation, and unit_type are required. units defaults to 1.0.
  • timestamp defaults to datetime.now(UTC).
  • idempotency_key defaults to a random UUID.
  • Any additional **kwargs become event dimensions (e.g. vendor, tenant_id, model). Dimension values are coerced to strings automatically.
  • Never raises. Exceptions are caught and logged — a billing write failure will never propagate to your business logic.
  • Returns None.

atrack() — asynchronous

Identical signature to track(), but async. Uses aioboto3 under the hood.

import ledger

await ledger.atrack(
    service="audit-service",
    operation="transcript",
    units=1,
    unit_type="requests",
    vendor="elevenlabs",
    tenant_id=tenant_id,
)

Never raises, same as track().

@record — decorator

For functions where the entire execution is one billable unit.

import ledger

@ledger.record(
    service="stats-service",
    operation="aggregate",
    unit_type="api_call",
    dimensions_from=["tenant_id"],
)
async def aggregate_results(tenant_id: str, data: list) -> dict:
    ...

Signature:

def record(
    service: str,
    operation: str,
    unit_type: str,
    *,
    units: float = 1.0,
    dimensions_from: list[str] | None = None,
    **static_dimensions: object,
) -> Callable
  • service, operation, unit_type are positional-or-keyword, required.
  • units defaults to 1.0.
  • dimensions_from — a list of parameter names to extract from the decorated function's arguments and include as dimensions.
  • Any additional **kwargs are passed as static dimensions on every call.
  • Auto-detects sync vs async: wraps sync functions with a sync wrapper and async functions with an async wrapper.
  • track() / atrack() is called after the function returns successfully.

recording() — sync context manager

Use when units are only known after the work completes.

import ledger

with ledger.recording(
    service="audit-service",
    operation="enrich",
    unit_type="tokens",
    vendor="openai",
    tenant_id=tenant_id,
) as entry:
    result = openai_client.chat.completions.create(...)
    entry.units = result.usage.total_tokens

Signature:

def recording(
    *,
    service: str,
    operation: str,
    unit_type: str,
    units: float = 0.0,
    **dimensions: Any,
) -> RecordingContext
  • entry.units is a read/write property — set it before the with block exits.
  • track() is called on successful exit only. If an exception propagates out of the block, no event is recorded.

arecording() — async context manager

Async equivalent of recording().

import ledger

async with ledger.arecording(
    service="audit-service",
    operation="enrich",
    unit_type="tokens",
    vendor="openai",
    tenant_id=tenant_id,
) as entry:
    result = await openai_client.chat.completions.create(...)
    entry.units = result.usage.total_tokens

Same behaviour as recording() — calls atrack() on successful exit only.

End-to-End Example: Audit Pipeline

A pipeline that transcribes audio, enriches the transcript, audits it, and stores results — all tracked per workspace_id.

import ledger


SERVICE = "audit-service"


async def run_pipeline(audio_url: str, workspace_id: str, job_id: str) -> dict:
    # ── Step 1: Transcribe audio (ElevenLabs) ────────────────────────
    # Cost is known upfront: 1 request. Use track().
    transcript = await elevenlabs_client.transcribe(audio_url, model="scribe_v1")

    await ledger.atrack(
        service=SERVICE,
        operation="transcribe",
        units=1,
        unit_type="requests",
        vendor="elevenlabs",
        model="scribe_v1",
        workspace_id=workspace_id,
        job_id=job_id,
    )

    # ── Step 2: Enrich transcript (OpenAI gpt-5-mini) ────────────────
    # Each token type is a separate resource consumed at a different cost.
    # Emit one event per unit_type so each can be queried and aggregated
    # independently in Timestream (e.g. SUM(units) WHERE unit_type = 'input_tokens').
    enrichment = await openai_client.chat.completions.create(
        model="gpt-5-mini",
        messages=[{"role": "user", "content": f"Enrich: {transcript.text}"}],
    )

    enrich_dims = dict(
        service=SERVICE, operation="enrich", vendor="openai",
        model="gpt-5-mini", workspace_id=workspace_id, job_id=job_id,
    )
    usage = enrichment.usage
    await ledger.atrack(units=usage.prompt_tokens, unit_type="input_tokens", **enrich_dims)
    await ledger.atrack(units=usage.completion_tokens, unit_type="output_tokens", **enrich_dims)
    await ledger.atrack(
        units=getattr(usage, "prompt_tokens_details", {}).get("cached_tokens", 0),
        unit_type="input_cached_tokens", **enrich_dims,
    )

    # ── Step 3: Audit the enriched transcript (OpenAI gpt-5) ─────────
    # Same pattern — one event per token type.
    audit = await openai_client.chat.completions.create(
        model="gpt-5",
        messages=[{"role": "user", "content": f"Audit: {enrichment.choices[0].message.content}"}],
    )

    audit_dims = dict(
        service=SERVICE, operation="audit", vendor="openai",
        model="gpt-5", workspace_id=workspace_id, job_id=job_id,
    )
    usage = audit.usage
    await ledger.atrack(units=usage.prompt_tokens, unit_type="input_tokens", **audit_dims)
    await ledger.atrack(units=usage.completion_tokens, unit_type="output_tokens", **audit_dims)
    await ledger.atrack(
        units=getattr(usage, "prompt_tokens_details", {}).get("cached_tokens", 0),
        unit_type="input_cached_tokens", **audit_dims,
    )

    # ── Step 4: Store results ─────────────────────────────────────────
    # Cost is fixed: 1 write. The decorator handles tracking on success.
    result = await store_audit_results(workspace_id, audit)
    return result


@ledger.record(
    service=SERVICE,
    operation="aggregate",
    unit_type="writes",
    dimensions_from=["workspace_id"],
)
async def store_audit_results(workspace_id: str, audit_response) -> dict:
    """Write audit results to the database. Tracked automatically by @record."""
    # ... db write logic ...
    return {"status": "stored", "workspace_id": workspace_id}

This pipeline produces 8 Ledger events for the two LLM steps (3 token types each), plus 1 for transcription and 1 for aggregation — 10 total per run. All share the same workspace_id and job_id dimensions for attribution and traceability.

Idempotency

Every event carries an idempotency_key used as the SQS MessageDeduplicationId. By default, a random UUID is generated per call — each call is treated as unique.

For deterministic deduplication (e.g. retries), provide your own key:

ledger.track(
    service="audit-service",
    operation="transcript",
    units=1,
    unit_type="requests",
    idempotency_key=f"{job_id}:transcript",
    vendor="elevenlabs",
    job_id=job_id,
)

SQS FIFO deduplicates messages with the same key within a 5-minute window.

Reserved Dimension Names

The following keys cannot be used as dimension **kwargs: service, operation, units, unit_type, timestamp, environment, idempotency_key, schema_version.

Passing a reserved name raises ValueError (before the fire-and-forget swallow catches it).