Sdk

Python SDK

Installation, configuration, and API reference for the Sluice Python SDK.

Sluice SDK -- Python

Installation

uv add ontopix-sluice

The package is published to the Ontopix CodeArtifact registry.

Configuration

Constructor

from sluice import SluiceConfig

config = SluiceConfig(
    table_name="ontopix-vendor-buckets-prod",  # DynamoDB table
    region="eu-central-1",                      # AWS region
    endpoint_url=None,                          # Custom DynamoDB endpoint (local dev)
    lease_ttl=60,                               # Lease TTL in seconds
    max_retries=3,                              # Retries on contention
    default_slot_timeout=30.0,                  # Default timeout for slot()
    log_level="INFO",                           # Logging level
)

All parameters are optional and fall back to the defaults shown above.

From environment variables

config = SluiceConfig.from_env()

from_env() reads these environment variables, falling back to the same defaults:

VariableTypeDefault
SLUICE_TABLE_NAMEstr"ontopix-vendor-buckets-prod"
SLUICE_AWS_REGIONstr"eu-central-1"
SLUICE_ENDPOINT_URLstrNone
SLUICE_LEASE_TTLint60
SLUICE_MAX_RETRIESint3
SLUICE_DEFAULT_SLOT_TIMEOUTfloat30.0
SLUICE_LOG_LEVELstr"INFO"

When no config argument is passed to any function, SluiceConfig.from_env() is called automatically.


slot -- context manager (preferred API)

slot is an async context manager that retries acquire internally until a slot is granted or the timeout is reached. The slot is automatically released when the block exits.

from sluice import slot

async with slot("openai#gpt-4o#requests", timeout=10.0) as result:
    # You have the slot -- make your API call here
    response = await call_openai(prompt)

Signature

async def slot(
    dimension: str,
    timeout: float | None = None,       # defaults to config.default_slot_timeout
    config: SluiceConfig | None = None,
) -> AsyncIterator[AcquireResult]

The yielded AcquireResult has:

  • outcome -- always AcquireOutcome.GRANTED inside the block
  • lease_key -- the lease identifier
  • dimension -- the dimension that was acquired

Multiple dimensions

Use slot_many to acquire slots across multiple dimensions atomically (all-or-nothing):

from sluice import slot_many

async with slot_many(
    ["openai#gpt-4o#requests", "openai#gpt-4o#tokens"],
    timeout=15.0,
) as result:
    response = await call_openai(prompt)

acquire / release -- low-level API

acquire performs a single attempt to obtain a slot. It returns immediately with either GRANTED or RETRY_IN. The caller is responsible for retry logic and calling release().

from sluice import acquire, AcquireOutcome

result = await acquire("openai#gpt-4o#requests")

if result.outcome == AcquireOutcome.GRANTED:
    try:
        response = await call_openai(prompt)
    finally:
        await result.release()
elif result.outcome == AcquireOutcome.RETRY_IN:
    print(f"Retry after {result.wait_seconds:.1f}s")

Signatures

async def acquire(
    dimension: str,
    config: SluiceConfig | None = None,
) -> AcquireResult

async def acquire_many(
    dimensions: list[str],
    config: SluiceConfig | None = None,
) -> AcquireResult

acquire_many is all-or-nothing: if any dimension has insufficient tokens, none are acquired and a single RETRY_IN result is returned with the maximum wait time.

AcquireResult

@dataclass
class AcquireResult:
    outcome: AcquireOutcome       # GRANTED or RETRY_IN
    wait_seconds: float = 0.0    # seconds to wait before retrying (RETRY_IN only)
    lease_key: str | None = None
    dimension: str | None = None

    async def release(self) -> None: ...

AcquireOutcome

class AcquireOutcome(Enum):
    GRANTED = "granted"
    RETRY_IN = "retry_in"

penalize

Reduces the token count for a dimension by a factor. Use this when a vendor returns 429 despite the slot being granted -- it corrects drift between Sluice's model and the vendor's actual state.

Best-effort: not transactional, no version check.

from sluice import penalize

# Reduce tokens to 80% of current value (default factor)
await penalize("openai#gpt-4o#requests")

# Custom factor
await penalize("openai#gpt-4o#requests", factor=0.5)

Signature

async def penalize(
    dimension: str,
    factor: float = 0.8,               # 0.0 to 1.0
    config: SluiceConfig | None = None,
) -> None

Error handling

SlotTimeout

Raised by slot() and slot_many() when the timeout expires without acquiring a slot.

from sluice import slot, SlotTimeout

try:
    async with slot("openai#gpt-4o#requests", timeout=5.0) as result:
        response = await call_openai(prompt)
except SlotTimeout:
    # Handle timeout -- queue the request, return a 503, etc.
    pass

Missing dimensions

If a dimension does not exist in the DynamoDB table, acquire raises a standard exception. Ensure dimensions are provisioned before calling the SDK.


Logging

Sluice logs structured events through the standard logging module:

import logging

logging.getLogger("sluice").setLevel(logging.DEBUG)

Log events include:

  • acquire.granted -- slot acquired successfully
  • acquire.contention_retry -- retrying due to DynamoDB transaction contention
  • release.concurrent -- released a concurrent-type slot
  • release.time_refill -- released a time-refill-type slot
  • penalize -- penalty applied

The SLUICE_LOG_LEVEL environment variable sets the initial level.


SluiceClient -- connection reuse

Module-level functions (slot, acquire, etc.) create a new DynamoDB connection on each call. For multiple calls within the same process, use SluiceClient to hold the connection:

from sluice import SluiceClient

async with SluiceClient() as client:
    # All calls reuse the same DynamoDB connection
    async with client.slot("openai#gpt-4o#requests", timeout=10.0) as result:
        response = await call_openai(prompt)

    # Penalize via the same connection
    await client.penalize("openai#gpt-4o#requests", factor=0.5)

SluiceClient accepts an optional SluiceConfig:

async with SluiceClient(config=SluiceConfig(max_retries=5)) as client:
    ...

The client provides the same methods as the module-level functions: acquire, acquire_many, slot, slot_many, penalize.

When to use SluiceClient:

  • Long-running services making repeated Sluice calls
  • Lambda handlers that call acquire() multiple times per invocation
  • Any scenario where connection setup latency matters

When module-level functions are fine:

  • Lambda handlers with a single slot() call per invocation
  • Scripts or CLI tools with one-off calls

Full example

import asyncio
from sluice import slot, penalize, SlotTimeout

async def call_with_rate_limit(prompt: str) -> str:
    try:
        async with slot("openai#gpt-4o#requests", timeout=10.0):
            response = await call_openai(prompt)
            return response
    except SlotTimeout:
        raise RuntimeError("Rate limit slot unavailable")

async def call_with_backpressure(prompt: str) -> str:
    try:
        async with slot("openai#gpt-4o#requests", timeout=10.0):
            try:
                response = await call_openai(prompt)
                return response
            except RateLimitError:
                await penalize("openai#gpt-4o#requests", factor=0.5)
                raise
    except SlotTimeout:
        raise RuntimeError("Rate limit slot unavailable")