Python SDK
Installation, configuration, and API reference for the Sluice Python SDK.
Sluice SDK -- Python
Installation
uv add ontopix-sluice
The package is published to the Ontopix CodeArtifact registry.
Configuration
Constructor
from sluice import SluiceConfig
config = SluiceConfig(
table_name="ontopix-vendor-buckets-prod", # DynamoDB table
region="eu-central-1", # AWS region
endpoint_url=None, # Custom DynamoDB endpoint (local dev)
lease_ttl=60, # Lease TTL in seconds
max_retries=3, # Retries on contention
default_slot_timeout=30.0, # Default timeout for slot()
log_level="INFO", # Logging level
)
All parameters are optional and fall back to the defaults shown above.
From environment variables
config = SluiceConfig.from_env()
from_env() reads these environment variables, falling back to the same defaults:
| Variable | Type | Default |
|---|---|---|
SLUICE_TABLE_NAME | str | "ontopix-vendor-buckets-prod" |
SLUICE_AWS_REGION | str | "eu-central-1" |
SLUICE_ENDPOINT_URL | str | None |
SLUICE_LEASE_TTL | int | 60 |
SLUICE_MAX_RETRIES | int | 3 |
SLUICE_DEFAULT_SLOT_TIMEOUT | float | 30.0 |
SLUICE_LOG_LEVEL | str | "INFO" |
When no config argument is passed to any function, SluiceConfig.from_env() is called automatically.
slot -- context manager (preferred API)
slot is an async context manager that retries acquire internally until a slot is granted or the timeout is reached. The slot is automatically released when the block exits.
from sluice import slot
async with slot("openai#gpt-4o#requests", timeout=10.0) as result:
# You have the slot -- make your API call here
response = await call_openai(prompt)
Signature
async def slot(
dimension: str,
timeout: float | None = None, # defaults to config.default_slot_timeout
config: SluiceConfig | None = None,
) -> AsyncIterator[AcquireResult]
The yielded AcquireResult has:
outcome-- alwaysAcquireOutcome.GRANTEDinside the blocklease_key-- the lease identifierdimension-- the dimension that was acquired
Multiple dimensions
Use slot_many to acquire slots across multiple dimensions atomically (all-or-nothing):
from sluice import slot_many
async with slot_many(
["openai#gpt-4o#requests", "openai#gpt-4o#tokens"],
timeout=15.0,
) as result:
response = await call_openai(prompt)
acquire / release -- low-level API
acquire performs a single attempt to obtain a slot. It returns immediately with either GRANTED or RETRY_IN. The caller is responsible for retry logic and calling release().
from sluice import acquire, AcquireOutcome
result = await acquire("openai#gpt-4o#requests")
if result.outcome == AcquireOutcome.GRANTED:
try:
response = await call_openai(prompt)
finally:
await result.release()
elif result.outcome == AcquireOutcome.RETRY_IN:
print(f"Retry after {result.wait_seconds:.1f}s")
Signatures
async def acquire(
dimension: str,
config: SluiceConfig | None = None,
) -> AcquireResult
async def acquire_many(
dimensions: list[str],
config: SluiceConfig | None = None,
) -> AcquireResult
acquire_many is all-or-nothing: if any dimension has insufficient tokens, none are acquired and a single RETRY_IN result is returned with the maximum wait time.
AcquireResult
@dataclass
class AcquireResult:
outcome: AcquireOutcome # GRANTED or RETRY_IN
wait_seconds: float = 0.0 # seconds to wait before retrying (RETRY_IN only)
lease_key: str | None = None
dimension: str | None = None
async def release(self) -> None: ...
AcquireOutcome
class AcquireOutcome(Enum):
GRANTED = "granted"
RETRY_IN = "retry_in"
penalize
Reduces the token count for a dimension by a factor. Use this when a vendor returns 429 despite the slot being granted -- it corrects drift between Sluice's model and the vendor's actual state.
Best-effort: not transactional, no version check.
from sluice import penalize
# Reduce tokens to 80% of current value (default factor)
await penalize("openai#gpt-4o#requests")
# Custom factor
await penalize("openai#gpt-4o#requests", factor=0.5)
Signature
async def penalize(
dimension: str,
factor: float = 0.8, # 0.0 to 1.0
config: SluiceConfig | None = None,
) -> None
Error handling
SlotTimeout
Raised by slot() and slot_many() when the timeout expires without acquiring a slot.
from sluice import slot, SlotTimeout
try:
async with slot("openai#gpt-4o#requests", timeout=5.0) as result:
response = await call_openai(prompt)
except SlotTimeout:
# Handle timeout -- queue the request, return a 503, etc.
pass
Missing dimensions
If a dimension does not exist in the DynamoDB table, acquire raises a standard exception. Ensure dimensions are provisioned before calling the SDK.
Logging
Sluice logs structured events through the standard logging module:
import logging
logging.getLogger("sluice").setLevel(logging.DEBUG)
Log events include:
acquire.granted-- slot acquired successfullyacquire.contention_retry-- retrying due to DynamoDB transaction contentionrelease.concurrent-- released a concurrent-type slotrelease.time_refill-- released a time-refill-type slotpenalize-- penalty applied
The SLUICE_LOG_LEVEL environment variable sets the initial level.
SluiceClient -- connection reuse
Module-level functions (slot, acquire, etc.) create a new DynamoDB connection on each call. For multiple calls within the same process, use SluiceClient to hold the connection:
from sluice import SluiceClient
async with SluiceClient() as client:
# All calls reuse the same DynamoDB connection
async with client.slot("openai#gpt-4o#requests", timeout=10.0) as result:
response = await call_openai(prompt)
# Penalize via the same connection
await client.penalize("openai#gpt-4o#requests", factor=0.5)
SluiceClient accepts an optional SluiceConfig:
async with SluiceClient(config=SluiceConfig(max_retries=5)) as client:
...
The client provides the same methods as the module-level functions: acquire, acquire_many, slot, slot_many, penalize.
When to use SluiceClient:
- Long-running services making repeated Sluice calls
- Lambda handlers that call
acquire()multiple times per invocation - Any scenario where connection setup latency matters
When module-level functions are fine:
- Lambda handlers with a single
slot()call per invocation - Scripts or CLI tools with one-off calls
Full example
import asyncio
from sluice import slot, penalize, SlotTimeout
async def call_with_rate_limit(prompt: str) -> str:
try:
async with slot("openai#gpt-4o#requests", timeout=10.0):
response = await call_openai(prompt)
return response
except SlotTimeout:
raise RuntimeError("Rate limit slot unavailable")
async def call_with_backpressure(prompt: str) -> str:
try:
async with slot("openai#gpt-4o#requests", timeout=10.0):
try:
response = await call_openai(prompt)
return response
except RateLimitError:
await penalize("openai#gpt-4o#requests", factor=0.5)
raise
except SlotTimeout:
raise RuntimeError("Rate limit slot unavailable")