Sdk

Go SDK

Installation, configuration, and API reference for the Ledger Go SDK.

Installation

The Go module is not yet published to a registry. Use a replace directive in your go.mod during development:

require github.com/ontopix/ledger/go v0.0.0

replace github.com/ontopix/ledger/go => ../ledger/go

The module path (github.com/ontopix/ledger/go) is provisional and may change in v0.2.0 when the module is published.

Environment Variables

VariableDefaultDescription
LEDGER_QUEUE_URL(required outside sandbox)SQS FIFO queue URL
LEDGER_ENVdevEnvironment tag added to every event
LEDGER_AWS_REGIONeu-central-1AWS region for SQS client
LEDGER_SANDBOX_MODEfalsetrue writes to stdout instead of SQS
LEDGER_LOG_LEVELINFOLog level

LEDGER_QUEUE_URL is validated in init(). If missing and sandbox mode is off, the process exits with log.Fatal.

Sandbox Mode

Set LEDGER_SANDBOX_MODE=true for local development. Events are printed to stdout as JSON — no SQS or AWS credentials required.

LEDGER_SANDBOX_MODE=true LEDGER_ENV=dev go run ./cmd/myservice

API Reference

ledger.Track() — synchronous, fire-and-forget

package main

import (
    "context"
    ledger "github.com/ontopix/ledger/go"
)

func main() {
    ctx := context.Background()

    ledger.Track(ctx, ledger.TrackParams{
        Service:   "audit-service",
        Operation: "transcript",
        Units:     1,
        UnitType:  "requests",
        Dimensions: map[string]string{
            "vendor":    "elevenlabs",
            "model":     "scribe_v2",
            "tenant_id": tenantID,
            "job_id":    jobID,
        },
    })
}

Signature:

type TrackParams struct {
    Service        string
    Operation      string
    Units          float64
    UnitType       string
    Timestamp      time.Time         // zero value → time.Now().UTC()
    IdempotencyKey string            // empty → random UUID
    Dimensions     map[string]string // caller dimensions
}

func Track(ctx context.Context, params TrackParams) // never panics, never returns error
  • Service, Operation, and UnitType are required. Units defaults to its zero value (0), so set it explicitly.
  • Timestamp defaults to time.Now().UTC() if zero.
  • IdempotencyKey defaults to a random UUID if empty.
  • Dimensions — any additional key-value pairs. Values are already strings (Go does not need coercion).
  • Never panics. Panics are recovered internally and logged. Errors from the SQS writer are logged, not returned.

ledger.NewRecording() — deferred tracking

Use when units are only known after the work completes. The typical pattern uses defer:

rec := ledger.NewRecording(ctx, ledger.TrackParams{
    Service:   "audit-service",
    Operation: "enrich",
    UnitType:  "tokens",
    Dimensions: map[string]string{
        "vendor":    "openai",
        "tenant_id": tenantID,
    },
})

result, err := openaiClient.CreateChatCompletion(ctx, req)
if err != nil {
    return err // rec.Track() is never called — no event recorded
}

rec.Units = float64(result.Usage.TotalTokens)
rec.Track()

Signature:

type Recording struct {
    Units float64 // read/write — set before calling Track()
    // unexported fields
}

func NewRecording(ctx context.Context, params TrackParams) *Recording
func (r *Recording) Track() // sends the event via ledger.Track()
  • rec.Units is a public field — set it before calling rec.Track().
  • Call rec.Track() only on success. If the operation fails, simply don't call it.
  • You can use defer rec.Track() if you want the event sent regardless of outcome, but the typical pattern is to call it explicitly after success so that failed operations are not recorded.

End-to-End Example: Audit Pipeline

A pipeline that transcribes audio, enriches the transcript, audits it, and stores results — all tracked per workspace_id.

package pipeline

import (
    "context"
    "fmt"

    ledger "github.com/ontopix/ledger/go"
)

const service = "audit-service"

func RunPipeline(ctx context.Context, audioURL, workspaceID, jobID string) (*AuditResult, error) {
    dims := map[string]string{
        "workspace_id": workspaceID,
        "job_id":       jobID,
    }

    // ── Step 1: Transcribe audio (ElevenLabs) ────────────────────────
    // Cost is known upfront: 1 request. Use Track().
    transcript, err := elevenlabsClient.Transcribe(ctx, audioURL, "scribe_v1")
    if err != nil {
        return nil, fmt.Errorf("transcribe: %w", err)
    }

    ledger.Track(ctx, ledger.TrackParams{
        Service:   service,
        Operation: "transcribe",
        Units:     1,
        UnitType:  "requests",
        Dimensions: mergeDims(dims, map[string]string{
            "vendor": "elevenlabs",
            "model":  "scribe_v1",
        }),
    })

    // ── Step 2: Enrich transcript (OpenAI gpt-5-mini) ────────────────
    // Each token type is a separate resource consumed at a different cost.
    // Emit one event per UnitType so each can be queried and aggregated
    // independently in Timestream (e.g. SUM(units) WHERE unit_type = 'input_tokens').
    enrichment, err := openaiClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model:    "gpt-5-mini",
        Messages: []openai.ChatCompletionMessage{{Role: "user", Content: "Enrich: " + transcript.Text}},
    })
    if err != nil {
        return nil, fmt.Errorf("enrich: %w", err)
    }

    enrichDims := mergeDims(dims, map[string]string{"vendor": "openai", "model": "gpt-5-mini"})
    trackTokens(ctx, "enrich", enrichDims, enrichment.Usage)

    // ── Step 3: Audit the enriched transcript (OpenAI gpt-5) ─────────
    // Same pattern — one event per token type.
    audit, err := openaiClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model:    "gpt-5",
        Messages: []openai.ChatCompletionMessage{{Role: "user", Content: "Audit: " + enrichment.Choices[0].Message.Content}},
    })
    if err != nil {
        return nil, fmt.Errorf("audit: %w", err)
    }

    auditDims := mergeDims(dims, map[string]string{"vendor": "openai", "model": "gpt-5"})
    trackTokens(ctx, "audit", auditDims, audit.Usage)

    // ── Step 4: Store results ─────────────────────────────────────────
    // Cost is fixed: 1 write. Track after successful DB write.
    result, err := storeAuditResults(ctx, workspaceID, audit)
    if err != nil {
        return nil, fmt.Errorf("aggregate: %w", err)
    }

    ledger.Track(ctx, ledger.TrackParams{
        Service:   service,
        Operation: "aggregate",
        Units:     1,
        UnitType:  "writes",
        Dimensions: dims,
    })

    return result, nil
}

// trackTokens emits one Ledger event per token type so each can be
// queried and aggregated independently in Timestream.
func trackTokens(ctx context.Context, operation string, dims map[string]string, usage openai.Usage) {
    base := ledger.TrackParams{Service: service, Operation: operation, Dimensions: dims}

    base.UnitType = "input_tokens"
    base.Units = float64(usage.PromptTokens)
    ledger.Track(ctx, base)

    base.UnitType = "output_tokens"
    base.Units = float64(usage.CompletionTokens)
    ledger.Track(ctx, base)

    base.UnitType = "input_cached_tokens"
    base.Units = float64(usage.PromptTokensDetails.CachedTokens)
    ledger.Track(ctx, base)
}

func mergeDims(base, extra map[string]string) map[string]string {
    m := make(map[string]string, len(base)+len(extra))
    for k, v := range base {
        m[k] = v
    }
    for k, v := range extra {
        m[k] = v
    }
    return m
}

This pipeline produces 8 Ledger events for the two LLM steps (3 token types each), plus 1 for transcription and 1 for aggregation — 10 total per run. All share the same workspace_id and job_id dimensions for attribution and traceability.

Idempotency

Every event carries an IdempotencyKey used as the SQS MessageDeduplicationId. By default, a random UUID is generated per call.

For deterministic deduplication (e.g. retries), provide your own key:

ledger.Track(ctx, ledger.TrackParams{
    Service:        "audit-service",
    Operation:      "transcript",
    Units:          1,
    UnitType:       "requests",
    IdempotencyKey: fmt.Sprintf("%s:transcript", jobID),
    Dimensions: map[string]string{
        "vendor": "elevenlabs",
        "job_id": jobID,
    },
})

SQS FIFO deduplicates messages with the same key within a 5-minute window.

Reserved Dimension Names

The following keys cannot be used in the Dimensions map: service, operation, units, unit_type, timestamp, environment, idempotency_key, schema_version.

Passing a reserved name causes the call to be silently dropped (logged as an error, not returned).