API Reference

Complete API documentation organized from high-level to low-level usage.

Documentation Structure:

  • Data Models - Detailed model schemas and structure
  • API Reference (this document) - Methods and usage patterns
  • Examples - Real-world usage scenarios

High-Level API

Recommended for most use cases. Complete workflows with minimal code.

CustomerInteraction.enrich()

Transform raw interactions with computed metrics and LLM analysis.

from audit_utils.models import CustomerInteraction

interaction = CustomerInteraction.model_validate(data)

# Complete enrichment (all phases)
interaction.enrich(model="gpt-4")

# Selective phases
interaction.enrich(
    phases=["structural", "statistical_basic", "llm_semantic"],
    model="gpt-4",
    inplace=True
)

# Return enriched copy
enriched = interaction.enrich(
    phases="all",
    model="gpt-4",
    inplace=False,
    temperature=0.0
)

Parameters:

  • phases (str | liststr): Which phases to run
    • "all" (default): All phases in optimal order
    • List: ["structural", "statistical_basic", "llm_semantic", "statistical_enhanced", "derived"]
  • model (str | None): LLM model for semantic phase
  • inplace (bool): Modify current object (True) or return copy (False)
  • temperature (float): LLM temperature (optional)
  • **llm_kwargs: Additional LLM parameters

Enrichment Phases:

  1. structural - Counts, durations (no content processing)
  2. statistical_basic - Word counts, response times (pre-LLM)
  3. llm_semantic - Sentiment, topics, traits (requires model)
  4. statistical_enhanced - Question ratio (uses LLM outputs)
  5. derived - Interaction-wide aggregations

Returns: Self (if inplace=True) or enriched copy (if inplace=False)

AuditCriteria.evaluate()

Complete evaluation pipeline with automatic task management.

from audit_utils.models import AuditCriteria

criteria = AuditCriteria.model_validate(criteria_data)

# Simple evaluation
result = criteria.evaluate(
    interaction,
    strategy="full",
    model="gpt-4"
)

# Parallel evaluation with auto-reduce
result = criteria.evaluate(
    interaction,
    strategy="individual",
    model="gpt-4",
    parallel=True,
    max_workers=10,
    reduce_results=True
)

# Get list of results (no reduce)
results = criteria.evaluate(
    interaction,
    strategy="grouped",
    model="gpt-4",
    parallel=True,
    reduce_results=False
)

Parameters:

  • interaction (CustomerInteraction): Interaction to audit
  • strategy (str): Processing strategy
    • "full": Single task, all criteria (best for <5 criteria)
    • "individual": One task per criterion (best for >10 independent)
    • "grouped": One task per group (best for 5-20 with logical groups)
  • model (str | None): LLM model override
  • parallel (bool): Enable parallel execution (default: False)
  • max_workers (int | None): Max parallel workers (default: CPU count)
  • reduce_results (bool): Combine results into one (default: True)
  • **llm_kwargs: Additional LLM parameters

Returns:

  • Single AuditResult (if reduce_results=True or single task)
  • List of AuditResult (if reduce_results=False and multiple tasks)

Mid-Level API

For custom workflows and task management.

AuditCriteria.map_tasks()

Create evaluation tasks from criteria and interaction.

# Create tasks for evaluation
tasks = criteria.map_tasks(
    interaction,
    strategy="individual"
)

# Each task is ready for evaluation
for task in tasks:
    result = criteria.evaluate_task(task, model="gpt-4")

Parameters:

  • interaction (CustomerInteraction): Interaction to audit
  • strategy (str): Mapping strategy (same as evaluate())

Returns: list[Task] - Tasks ready for evaluation

AuditCriteria.evaluate_task()

Evaluate a single task with LLM.

# Evaluate one task
result = criteria.evaluate_task(
    task,
    model="gpt-4",
    temperature=0.0
)

Parameters:

  • task (Task): Task to evaluate
  • model (str | None): LLM model override
  • **llm_kwargs: Additional LLM parameters

Returns: AuditResult - Single evaluation result


Low-Level API

Primitives for complete custom control.

AuditCriteria.map()

Group criteria by strategy.

# Get criterion groups
groups = criteria.map(strategy="individual")
# Returns: [[criterion1], [criterion2], [criterion3], ...]

groups = criteria.map(strategy="full")
# Returns: [[all criteria]]

groups = criteria.map(strategy="grouped")
# Returns: [[group1 criteria], [group2 criteria], ...]

Parameters:

  • strategy (str): Grouping strategy

Returns: list[list[Criterion]] - Grouped criteria

AuditCriteria.reduce()

Combine multiple results into one.

# Combine results
final_result = criteria.reduce(
    results,
    strategy="individual"
)

Parameters:

  • results (listAuditResult): Results to combine
  • strategy (str | None): Strategy hint for metadata

Returns: AuditResult - Combined result


Processors

Low-level evaluation function (used internally).

evaluate_criteria()

from audit_utils.processors import evaluate_criteria
from audit_utils.models import Task

task = Task(
    interaction=interaction.model_dump(),
    criteria=criteria.model_dump(),
    metadata={"processing_strategy": "full"}
)

result = evaluate_criteria(
    task,
    model="gpt-4",
    include_schemas=False,
    temperature=0.0
)

Parameters:

  • task (Task): Task to evaluate
  • model (str | None): LLM model
  • include_schemas (bool): Include schema docs in prompt (default: False)
  • **llm_kwargs: LLM parameters

Returns: AuditResult


LLM Configuration

Environment Variables

# OpenAI (default)
export LLM_PROVIDER="openai"
export OPENAI_API_KEY="sk-..."
export OPENAI_MODEL="gpt-4"

# Anthropic
export LLM_PROVIDER="anthropic"
export ANTHROPIC_API_KEY="sk-ant-..."
export ANTHROPIC_MODEL="claude-3-5-sonnet-20241022"

# Google
export LLM_PROVIDER="google"
export GOOGLE_API_KEY="..."
export GOOGLE_MODEL="gemini-1.5-pro"

LLM Client Factory

from audit_utils.llm import get_llm_client

client = get_llm_client()  # Uses environment variables

LLM Parameters

All evaluation methods accept these parameters:

result = criteria.evaluate(
    interaction,
    model="gpt-4",               # Override default
    temperature=0.0,             # 0.0-2.0
    max_completion_tokens=16000, # Token limit
    top_p=1.0,                   # Nucleus sampling
)

Usage Examples

Complete Workflow

from audit_utils.models import CustomerInteraction, AuditCriteria
import json

# 1. Load data
with open("interaction.json") as f:
    interaction = CustomerInteraction.model_validate(json.load(f))

with open("criteria.json") as f:
    criteria = AuditCriteria.model_validate(json.load(f))

# 2. Enrich interaction (recommended)
interaction.enrich(model="gpt-4")

# 3. Evaluate
result = criteria.evaluate(
    interaction,
    strategy="full",
    model="gpt-4"
)

# 4. Access results
for cr in result.criteria_results:
    print(f"{cr.name}: {cr.score}")
    for ir in cr.indicator_results:
        print(f"  {ir.id}: {ir.value}")

# 5. Save results
with open("result.json", "w") as f:
    json.dump(result.model_dump(mode="json"), f, indent=2)

Parallel Processing

# Maximum speed with parallelization
result = criteria.evaluate(
    interaction,
    strategy="individual",
    model="gpt-4",
    parallel=True,
    max_workers=10
)

Batch Processing

from concurrent.futures import ThreadPoolExecutor

interactions = [load_interaction(id) for id in ids]
criteria = load_criteria("quality_2025")

def process_one(interaction):
    return criteria.evaluate(interaction, strategy="full", model="gpt-4")

with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(process_one, interactions))

Airflow Integration

from airflow.decorators import dag, task
from datetime import datetime
from audit_utils.models import CustomerInteraction, AuditCriteria

@dag(schedule_interval=None, start_date=datetime(2025, 1, 1))
def audit_pipeline():

    @task
    def load_data():
        interaction = CustomerInteraction.model_validate(load_from_db())
        criteria = AuditCriteria.model_validate(load_criteria())
        return {"interaction": interaction, "criteria": criteria}

    @task
    def enrich_interaction(data):
        data["interaction"].enrich(model="gpt-4")
        return data

    @task
    def evaluate_criteria(data):
        return data["criteria"].evaluate(
            data["interaction"],
            strategy="full",
            model="gpt-4"
        )

    @task
    def save_results(result):
        save_to_db(result)

    data = load_data()
    enriched = enrich_interaction(data)
    result = evaluate_criteria(enriched)
    save_results(result)

dag = audit_pipeline()

Custom Workflow (Primitives)

# Full control with low-level API

# 1. Map criteria into groups
criterion_groups = criteria.map(strategy="grouped")

# 2. Create tasks
tasks = criteria.map_tasks(interaction, strategy="grouped")

# 3. Evaluate (can parallelize here)
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(criteria.evaluate_task, task, model="gpt-4")
        for task in tasks
    ]
    results = [f.result() for f in futures]

# 4. Reduce results
final_result = criteria.reduce(results, strategy="grouped")

Standalone Script

#!/usr/bin/env python3
import json
import sys
from audit_utils.models import CustomerInteraction, AuditCriteria

def main():
    # Load from files
    with open(sys.argv[1]) as f:
        interaction = CustomerInteraction.model_validate(json.load(f))
    
    with open(sys.argv[2]) as f:
        criteria = AuditCriteria.model_validate(json.load(f))
    
    # Enrich + Evaluate
    interaction.enrich(model="gpt-4")
    result = criteria.evaluate(interaction, strategy="full", model="gpt-4")
    
    # Output
    print(json.dumps(result.model_dump(mode="json"), indent=2))

if __name__ == "__main__":
    main()

Usage: python audit.py interaction.json criteria.json > result.json


Error Handling

from pydantic import ValidationError

# Validation errors
try:
    interaction = CustomerInteraction.model_validate(data)
except ValidationError as e:
    for error in e.errors():
        print(f"Field: {error['loc']}, Error: {error['msg']}")

# LLM errors
try:
    result = criteria.evaluate(interaction, model="gpt-4")
except Exception as e:
    print(f"Evaluation failed: {e}")
    # Implement retry logic

Retry Pattern

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def evaluate_with_retry(criteria, interaction):
    return criteria.evaluate(interaction, model="gpt-4")

Testing

import pytest
from audit_utils.models import CustomerInteraction, AuditCriteria

@pytest.mark.unit
def test_map_full(sample_interaction, sample_criteria):
    tasks = sample_criteria.map_tasks(sample_interaction, strategy="full")
    assert len(tasks) == 1

@pytest.mark.integration
def test_evaluate(mock_llm, sample_task, sample_criteria):
    result = sample_criteria.evaluate_task(sample_task)
    assert result.criteria_results

Run tests:

pytest -m unit          # Fast, no API calls
pytest -m integration   # Mocked dependencies  
pytest -m e2e           # Real API (requires key)

Helper Functions

Sentiment Distribution

from audit_utils.models.customer_interaction import (
    sentiment_dict_to_distribution,
    distribution_to_sentiment_dict
)
from audit_utils.models._generated.customer_interaction import (
    Sentiment,
    SentimentDistribution
)

# Dict to object
dist = sentiment_dict_to_distribution({
    Sentiment.positive: 0.7,
    Sentiment.neutral: 0.3
})

# Object to dict
result = distribution_to_sentiment_dict(dist)