How to Handle 429 Responses
How to react to vendor 429 errors by penalizing buckets and deciding between inline retry or requeue.
When a vendor returns HTTP 429 (Too Many Requests) despite Sluice granting a slot, the vendor's actual capacity is lower than what Sluice believes. This guide explains how to react: penalize the bucket, then decide whether to retry inline or requeue.
When to call penalize()
Call penalize() immediately after receiving a 429 from the vendor, before retrying or requeuing. This reduces the token count in the bucket so other workers see the corrected capacity.
from sluice import penalize
await penalize("openai#rpm", factor=0.8)
What penalize does
- Reads the current
tokensvalue from the bucket - Multiplies by
factor(default0.8, meaning a 20% reduction) - Writes the reduced value back
Penalize is best-effort and non-transactional -- it does not check the version counter. This is intentional: a slightly stale penalty is better than no penalty, and the lazy refill mechanism will restore tokens naturally over time.
Choosing a factor
| Situation | Suggested factor |
|---|---|
| First 429 from a vendor | 0.8 (20% reduction) |
| Repeated 429s in quick succession | 0.5 (50% reduction) |
Vendor returning Retry-After header with long delay | 0.3 (70% reduction) |
Lower factors are more aggressive. The bucket self-heals via refill, so even an over-aggressive penalty corrects itself within capacity / refill_rate seconds.
Decision tree: inline retry vs SQS requeue
After calling penalize(), decide what to do with the current work item:
429 received
|
await penalize()
|
wait_seconds <= 5s?
/ \
yes no
| |
sleep inline requeue to SQS
then retry once with visibility delay
Inline retry (wait_seconds <= 5s)
If the wait is short, sleep inside the current Lambda invocation and retry:
import asyncio
from sluice import slot, penalize, acquire, AcquireOutcome
# After a 429...
await penalize("openai#rpm", factor=0.8)
result = await acquire("openai#rpm")
if result.outcome == AcquireOutcome.RETRY_IN and result.wait_seconds <= 5:
await asyncio.sleep(result.wait_seconds)
# retry the vendor call
This avoids the overhead of SQS round-tripping for brief delays.
SQS requeue (wait_seconds > 5s)
If the wait is longer, return the message to SQS with a visibility timeout so it becomes available after the delay:
await penalize("openai#rpm", factor=0.8)
result = await acquire("openai#rpm")
if result.outcome == AcquireOutcome.RETRY_IN and result.wait_seconds > 5:
await sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(work_item),
DelaySeconds=min(int(result.wait_seconds) + 1, 900),
)
return # Lambda exits cleanly
The + 1 second buffer ensures tokens have refilled by the time the message becomes visible. SQS DelaySeconds maximum is 900 (15 minutes).
Timeout safety
The slot timeout must be shorter than the Lambda timeout to guarantee release() runs:
slot_timeout = lambda_timeout - 5s buffer
| Lambda timeout | Recommended slot timeout |
|---|---|
| 30s | 25s |
| 60s | 55s |
| 300s | 295s |
If the slot times out before the vendor responds, the slot() context manager raises SlotTimeoutError and calls release() automatically. Without this buffer, Lambda could be killed by the runtime while the slot is still held, leaving it for the reconciler to clean up.
Set SLUICE_DEFAULT_SLOT_TIMEOUT or pass timeout explicitly:
async with slot("openai#rpm", timeout=25):
response = await call_vendor()
Complete example: Lambda handler with 429 handling
import asyncio
import json
import logging
from sluice import slot, penalize, acquire, AcquireOutcome
logger = logging.getLogger(__name__)
INLINE_THRESHOLD = 5 # seconds
async def handle_record(record: dict, sqs_client, queue_url: str) -> None:
"""Process a single SQS record with Sluice rate limiting."""
dimensions = ["openai#rpm", "openai#tpm"]
# Attempt to acquire slots
result = await acquire(*dimensions)
if result.outcome == AcquireOutcome.RETRY_IN:
if result.wait_seconds <= INLINE_THRESHOLD:
logger.info("Short wait, retrying inline", extra={
"wait_seconds": result.wait_seconds,
})
await asyncio.sleep(result.wait_seconds)
result = await acquire(*dimensions)
if result.outcome != AcquireOutcome.GRANTED:
await _requeue(sqs_client, queue_url, record, result.wait_seconds)
return
else:
await _requeue(sqs_client, queue_url, record, result.wait_seconds)
return
# Slot granted -- make the vendor call
try:
response = await call_openai(record["prompt"])
except VendorRateLimitError:
# Vendor 429 despite having a slot
logger.warning("Vendor 429 received, penalizing bucket")
await penalize("openai#rpm", factor=0.8)
await _requeue(sqs_client, queue_url, record, wait_seconds=10)
return
finally:
await result.release()
# Process successful response
await save_result(response)
async def _requeue(sqs_client, queue_url: str, record: dict, wait_seconds: float) -> None:
"""Send message back to SQS with visibility delay."""
delay = min(int(wait_seconds) + 1, 900)
await sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(record),
DelaySeconds=delay,
)
logger.info("Requeued with delay", extra={"delay_seconds": delay})
Key points
- Always call
penalize()before retrying or requeuing -- it prevents other workers from hitting the same 429. - The 5-second inline threshold matches
SLUICE_INLINE_RETRY_THRESHOLD(configurable via env var). - Penalize is idempotent in effect -- calling it multiple times just reduces tokens further, and refill restores them naturally.
- Sluice does not own SQS queues. The requeue pattern is the calling product's responsibility.