Go SDK
Installation, configuration, and API reference for the Ledger Go SDK.
Installation
The Go module is not yet published to a registry. Use a replace directive in your
go.mod during development:
require github.com/ontopix/ledger/go v0.0.0
replace github.com/ontopix/ledger/go => ../ledger/go
The module path (github.com/ontopix/ledger/go) is provisional and may change in
v0.2.0 when the module is published.
Environment Variables
| Variable | Default | Description |
|---|---|---|
LEDGER_QUEUE_URL | (required outside sandbox) | SQS FIFO queue URL |
LEDGER_ENV | dev | Environment tag added to every event |
LEDGER_AWS_REGION | eu-central-1 | AWS region for SQS client |
LEDGER_SANDBOX_MODE | false | true writes to stdout instead of SQS |
LEDGER_LOG_LEVEL | INFO | Log level |
LEDGER_QUEUE_URL is validated in init(). If missing and sandbox mode is off, the
process exits with log.Fatal.
Sandbox Mode
Set LEDGER_SANDBOX_MODE=true for local development. Events are printed to stdout
as JSON — no SQS or AWS credentials required.
LEDGER_SANDBOX_MODE=true LEDGER_ENV=dev go run ./cmd/myservice
API Reference
ledger.Track() — synchronous, fire-and-forget
package main
import (
"context"
ledger "github.com/ontopix/ledger/go"
)
func main() {
ctx := context.Background()
ledger.Track(ctx, ledger.TrackParams{
Service: "audit-service",
Operation: "transcript",
Units: 1,
UnitType: "requests",
Dimensions: map[string]string{
"vendor": "elevenlabs",
"model": "scribe_v2",
"tenant_id": tenantID,
"job_id": jobID,
},
})
}
Signature:
type TrackParams struct {
Service string
Operation string
Units float64
UnitType string
Timestamp time.Time // zero value → time.Now().UTC()
IdempotencyKey string // empty → random UUID
Dimensions map[string]string // caller dimensions
}
func Track(ctx context.Context, params TrackParams) // never panics, never returns error
Service,Operation, andUnitTypeare required.Unitsdefaults to its zero value (0), so set it explicitly.Timestampdefaults totime.Now().UTC()if zero.IdempotencyKeydefaults to a random UUID if empty.Dimensions— any additional key-value pairs. Values are already strings (Go does not need coercion).- Never panics. Panics are recovered internally and logged. Errors from the SQS writer are logged, not returned.
ledger.NewRecording() — deferred tracking
Use when units are only known after the work completes. The typical pattern uses
defer:
rec := ledger.NewRecording(ctx, ledger.TrackParams{
Service: "audit-service",
Operation: "enrich",
UnitType: "tokens",
Dimensions: map[string]string{
"vendor": "openai",
"tenant_id": tenantID,
},
})
result, err := openaiClient.CreateChatCompletion(ctx, req)
if err != nil {
return err // rec.Track() is never called — no event recorded
}
rec.Units = float64(result.Usage.TotalTokens)
rec.Track()
Signature:
type Recording struct {
Units float64 // read/write — set before calling Track()
// unexported fields
}
func NewRecording(ctx context.Context, params TrackParams) *Recording
func (r *Recording) Track() // sends the event via ledger.Track()
rec.Unitsis a public field — set it before callingrec.Track().- Call
rec.Track()only on success. If the operation fails, simply don't call it. - You can use
defer rec.Track()if you want the event sent regardless of outcome, but the typical pattern is to call it explicitly after success so that failed operations are not recorded.
End-to-End Example: Audit Pipeline
A pipeline that transcribes audio, enriches the transcript, audits it, and stores
results — all tracked per workspace_id.
package pipeline
import (
"context"
"fmt"
ledger "github.com/ontopix/ledger/go"
)
const service = "audit-service"
func RunPipeline(ctx context.Context, audioURL, workspaceID, jobID string) (*AuditResult, error) {
dims := map[string]string{
"workspace_id": workspaceID,
"job_id": jobID,
}
// ── Step 1: Transcribe audio (ElevenLabs) ────────────────────────
// Cost is known upfront: 1 request. Use Track().
transcript, err := elevenlabsClient.Transcribe(ctx, audioURL, "scribe_v1")
if err != nil {
return nil, fmt.Errorf("transcribe: %w", err)
}
ledger.Track(ctx, ledger.TrackParams{
Service: service,
Operation: "transcribe",
Units: 1,
UnitType: "requests",
Dimensions: mergeDims(dims, map[string]string{
"vendor": "elevenlabs",
"model": "scribe_v1",
}),
})
// ── Step 2: Enrich transcript (OpenAI gpt-5-mini) ────────────────
// Each token type is a separate resource consumed at a different cost.
// Emit one event per UnitType so each can be queried and aggregated
// independently in Timestream (e.g. SUM(units) WHERE unit_type = 'input_tokens').
enrichment, err := openaiClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-5-mini",
Messages: []openai.ChatCompletionMessage{{Role: "user", Content: "Enrich: " + transcript.Text}},
})
if err != nil {
return nil, fmt.Errorf("enrich: %w", err)
}
enrichDims := mergeDims(dims, map[string]string{"vendor": "openai", "model": "gpt-5-mini"})
trackTokens(ctx, "enrich", enrichDims, enrichment.Usage)
// ── Step 3: Audit the enriched transcript (OpenAI gpt-5) ─────────
// Same pattern — one event per token type.
audit, err := openaiClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: "gpt-5",
Messages: []openai.ChatCompletionMessage{{Role: "user", Content: "Audit: " + enrichment.Choices[0].Message.Content}},
})
if err != nil {
return nil, fmt.Errorf("audit: %w", err)
}
auditDims := mergeDims(dims, map[string]string{"vendor": "openai", "model": "gpt-5"})
trackTokens(ctx, "audit", auditDims, audit.Usage)
// ── Step 4: Store results ─────────────────────────────────────────
// Cost is fixed: 1 write. Track after successful DB write.
result, err := storeAuditResults(ctx, workspaceID, audit)
if err != nil {
return nil, fmt.Errorf("aggregate: %w", err)
}
ledger.Track(ctx, ledger.TrackParams{
Service: service,
Operation: "aggregate",
Units: 1,
UnitType: "writes",
Dimensions: dims,
})
return result, nil
}
// trackTokens emits one Ledger event per token type so each can be
// queried and aggregated independently in Timestream.
func trackTokens(ctx context.Context, operation string, dims map[string]string, usage openai.Usage) {
base := ledger.TrackParams{Service: service, Operation: operation, Dimensions: dims}
base.UnitType = "input_tokens"
base.Units = float64(usage.PromptTokens)
ledger.Track(ctx, base)
base.UnitType = "output_tokens"
base.Units = float64(usage.CompletionTokens)
ledger.Track(ctx, base)
base.UnitType = "input_cached_tokens"
base.Units = float64(usage.PromptTokensDetails.CachedTokens)
ledger.Track(ctx, base)
}
func mergeDims(base, extra map[string]string) map[string]string {
m := make(map[string]string, len(base)+len(extra))
for k, v := range base {
m[k] = v
}
for k, v := range extra {
m[k] = v
}
return m
}
This pipeline produces 8 Ledger events for the two LLM steps (3 token types each),
plus 1 for transcription and 1 for aggregation — 10 total per run. All share the same
workspace_id and job_id dimensions for attribution and traceability.
Idempotency
Every event carries an IdempotencyKey used as the SQS MessageDeduplicationId.
By default, a random UUID is generated per call.
For deterministic deduplication (e.g. retries), provide your own key:
ledger.Track(ctx, ledger.TrackParams{
Service: "audit-service",
Operation: "transcript",
Units: 1,
UnitType: "requests",
IdempotencyKey: fmt.Sprintf("%s:transcript", jobID),
Dimensions: map[string]string{
"vendor": "elevenlabs",
"job_id": jobID,
},
})
SQS FIFO deduplicates messages with the same key within a 5-minute window.
Reserved Dimension Names
The following keys cannot be used in the Dimensions map:
service, operation, units, unit_type, timestamp, environment,
idempotency_key, schema_version.
Passing a reserved name causes the call to be silently dropped (logged as an error, not returned).