You run one AI agent and your LLM bill is predictable. You switch to asyncio.gather() to run ten agents concurrently and the bill isn't ten times bigger — it's forty times bigger. The retry storm from two agents hitting a rate limit simultaneously, combined with a tool-call loop that no one noticed until it had spawned twelve concurrent subtasks, is what actually shows up on your invoice.
This guide covers the four specific ways that async Python agent patterns turn a manageable cost into an emergency. For each one, we show the exact code change that stops it — with working asyncio-safe implementations you can drop into your codebase today.
Prerequisite: This guide assumes you've already read How to Build a Circuit Breaker for Python AI Agents. The patterns here extend that synchronous circuit breaker into async territory. The CLOSED/OPEN/HALF_OPEN state machine from that guide is the foundation — here we make it safe for asyncio workloads.
Why async makes the cost problem worse
In a synchronous agent loop, failures are linear. One agent, one call, one retry. If it loops, it loops sequentially and the burn rate is bounded by the single-thread execution speed.
In an async agent, failures fan out. Consider asyncio.gather(agent_run() for _ in range(10)): if a shared tool starts returning errors, all ten agents receive the error at roughly the same time, all ten enter their retry logic at the same time, and all ten fire their next retry at the same time. What would have been a 10-second sequential wait becomes a 1-second synchronized burst of 10 simultaneous API calls — each of which also fails and triggers another round.
This is not a hypothetical. It is the most common source of surprise cost spikes for teams graduating from synchronous to concurrent agent architectures.
Problem 1: The synchronized retry storm
The default retry behavior in most LLM SDKs (exponential backoff with jitter) was designed for synchronous, single-client use. When you run it inside asyncio.gather(), the "jitter" each task adds is independently sampled but not globally coordinated. Twenty tasks sleeping for "2 seconds plus random(0, 1)" all wake up within the same 1-second window and immediately refire.
# THIS is the danger pattern — unconstrained concurrent retries
import asyncio
import anthropic
client = anthropic.Anthropic()
async def run_agent(task: str) -> str:
# Each coroutine has its own retry logic.
# If the API rate-limits all of them simultaneously,
# they all retry simultaneously. The burst compounds.
for attempt in range(5):
try:
response = client.messages.create( # sync call in async context!
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
except anthropic.RateLimitError:
await asyncio.sleep(2 ** attempt + random.random())
raise RuntimeError("Max retries exceeded")
# Running 20 agents simultaneously
results = await asyncio.gather(*[run_agent(t) for t in tasks])
Two bugs here beyond the retry storm: the synchronous client.messages.create() call inside an async function blocks the event loop for the duration of the API call, and there's no global semaphore to cap concurrency.
The fix: a shared semaphore + async client
The correct pattern uses the async Anthropic client and a single asyncio.Semaphore shared across all concurrent agents. The semaphore is your concurrency budget — it prevents more than N agents from making API calls simultaneously, which keeps rate-limit pressure manageable and prevents the synchronized-burst failure mode.
import asyncio
import anthropic
# Shared semaphore — tune this to your tier's rate limit.
# Anthropic Tier 2: ~50 RPM; with 10 agents each making 5 calls,
# 5 concurrent callers keeps you well under the limit.
_API_SEMAPHORE = asyncio.Semaphore(5)
async def run_agent(client: anthropic.AsyncAnthropic, task: str) -> str:
async with _API_SEMAPHORE:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
async def main(tasks: list[str]) -> list[str]:
async with anthropic.AsyncAnthropic() as client:
return await asyncio.gather(*[run_agent(client, t) for t in tasks])
The semaphore does not add jitter automatically — it serializes at the gate. If you want jitter after a rate-limit error, add it once at the semaphore level, not inside each agent's retry loop. One coordination point beats twenty independent ones.
Problem 2: Budget-per-session vs budget-per-concurrent-run accounting
Your per-agent budget cap is $0.50. You run 20 agents concurrently. Your budget exposure is $10.00 — not $0.50. This is obvious in retrospect and invisible in practice, because every budget check in the existing codebase checks one agent's spend, not the fleet's total spend.
import asyncio
import threading
from dataclasses import dataclass, field
@dataclass
class ConcurrentBudgetTracker:
"""
Thread-safe (asyncio-safe) budget tracker for concurrent agent runs.
Uses a lock so that budget checks and increments are atomic across coroutines.
"""
hard_limit_usd: float
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
_total_spent: float = field(default=0.0)
_per_agent_spent: dict[str, float] = field(default_factory=dict)
async def can_spend(self, agent_id: str, estimated_usd: float) -> bool:
async with self._lock:
agent_total = self._per_agent_spent.get(agent_id, 0.0)
fleet_total = self._total_spent
# Both the per-agent cap and the fleet cap must pass
return (
agent_total + estimated_usd <= self.hard_limit_usd * 0.2 # per-agent: 20% of fleet cap
and fleet_total + estimated_usd <= self.hard_limit_usd
)
async def record_spend(self, agent_id: str, actual_usd: float) -> None:
async with self._lock:
self._per_agent_spent[agent_id] = (
self._per_agent_spent.get(agent_id, 0.0) + actual_usd
)
self._total_spent += actual_usd
@property
def total_spent(self) -> float:
return self._total_spent
Pass a single ConcurrentBudgetTracker instance to every coroutine in your asyncio.gather() call. The asyncio.Lock ensures that if agents 3 and 7 both hit can_spend() at the same time, one waits while the other commits its spend — preventing both from passing a check that together would exceed the fleet budget.
asyncio.Lock vs threading.Lock: Always use asyncio.Lock (not threading.Lock) inside coroutines. A threading lock will deadlock your event loop if acquired inside an await expression. An asyncio.Lock yields control to the event loop while waiting, keeping other coroutines running.
Problem 3: Fan-out tool calls without depth limits
This is the async equivalent of the recursive loop. An agent calls a "research" tool, which spins up three sub-agents, each of which calls the same "research" tool, which spins up three more sub-agents each. After two levels of fan-out, you have nine agents running in parallel. After three levels: twenty-seven. This is not a theoretical example — it is the default behavior of any orchestrator-worker pattern that doesn't explicitly cap its branching factor.
import asyncio
from contextvars import ContextVar
# Track recursion depth per-task using Python context variables.
# ContextVar isolates the value per asyncio task — safe for concurrent use.
_AGENT_DEPTH: ContextVar[int] = ContextVar("agent_depth", default=0)
MAX_AGENT_DEPTH = 3 # levels of fan-out permitted
MAX_CONCURRENT_FANS = 4 # max branches at any single level
async def research_agent(
query: str,
budget: ConcurrentBudgetTracker,
semaphore: asyncio.Semaphore,
) -> str:
depth = _AGENT_DEPTH.get()
if depth >= MAX_AGENT_DEPTH:
# Hard stop — return partial result, don't recurse
return f"[depth limit reached at {depth}; query: {query[:80]}]"
# Set depth for any tasks spawned by this coroutine
token = _AGENT_DEPTH.set(depth + 1)
try:
sub_queries = await _plan_sub_queries(query, semaphore)
# Cap fan-out: take at most MAX_CONCURRENT_FANS sub-queries
capped = sub_queries[:MAX_CONCURRENT_FANS]
results = await asyncio.gather(
*[research_agent(q, budget, semaphore) for q in capped],
return_exceptions=True,
)
return _combine_results(results)
finally:
_AGENT_DEPTH.reset(token)
ContextVar is the right tool here — not a global counter, not a threading.local() — because asyncio tasks share a thread but not a context. Each task created by asyncio.create_task() or asyncio.gather() gets a shallow copy of the current context, so _AGENT_DEPTH.set() in a parent task does not affect sibling tasks. This gives you per-branch depth tracking without cross-task interference.
Problem 4: Uncancelled tasks after circuit breaker trips
You implement a circuit breaker correctly in the synchronous sense: when the error rate exceeds the threshold, you stop accepting new calls. But in an async context, there are already 15 in-flight tasks at the moment the breaker opens. Each of those tasks will still complete — or fail and retry — independently. The breaker tripped, but the cost is still accruing from work that was already in flight when it tripped.
The fix is to make the circuit breaker trip cancel its in-flight tasks. This requires tracking them:
import asyncio
from enum import Enum
class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class AsyncCircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_max_calls: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self._state = State.CLOSED
self._failure_count = 0
self._last_failure_time: float | None = None
self._half_open_calls = 0
self._lock = asyncio.Lock()
# Track in-flight tasks so we can cancel them on trip
self._in_flight: set[asyncio.Task] = set()
async def call(self, coro):
async with self._lock:
if self._state == State.OPEN:
if self._should_attempt_reset():
self._state = State.HALF_OPEN
self._half_open_calls = 0
else:
raise CircuitBreakerOpenError("Circuit is OPEN — call rejected")
if self._state == State.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpenError("HALF_OPEN probe limit reached")
self._half_open_calls += 1
task = asyncio.current_task()
if task is not None:
self._in_flight.add(task)
try:
result = await coro
await self._on_success()
return result
except Exception as exc:
await self._on_failure()
raise
finally:
self._in_flight.discard(task)
async def _on_success(self) -> None:
async with self._lock:
if self._state == State.HALF_OPEN:
self._state = State.CLOSED
self._failure_count = 0
async def _on_failure(self) -> None:
async with self._lock:
self._failure_count += 1
self._last_failure_time = asyncio.get_event_loop().time()
if self._failure_count >= self.failure_threshold:
await self._trip()
async def _trip(self) -> None:
# Called inside _lock — do not re-acquire
self._state = State.OPEN
# Cancel all in-flight tasks except the current one
current = asyncio.current_task()
for task in list(self._in_flight):
if task is not current and not task.done():
task.cancel()
def _should_attempt_reset(self) -> bool:
if self._last_failure_time is None:
return False
elapsed = asyncio.get_event_loop().time() - self._last_failure_time
return elapsed >= self.recovery_timeout
class CircuitBreakerOpenError(Exception):
pass
The cancellation in _trip() sends a CancelledError into each in-flight task at its next await point. The task's cost for the current API call may still be charged (the HTTP request was already sent), but any subsequent retry iterations, tool calls, or sub-tasks will not execute.
Cancellation is cooperative in asyncio. A cancelled task must await at some point for the CancelledError to be injected. Tasks that block the event loop with synchronous calls (sleeping with time.sleep(), calling sync SDKs without run_in_executor) cannot be cancelled until they return. This is another reason to use the async Anthropic client everywhere.
Putting it together: the async-safe guard pattern
Here is a minimal but complete pattern that combines all four fixes: semaphore, fleet budget tracking, depth limiting, and a cancellation-aware circuit breaker. This is what RunGuard's async_guard() decorator implements under the hood.
import asyncio
import anthropic
from contextvars import ContextVar
_DEPTH: ContextVar[int] = ContextVar("depth", default=0)
async def guarded_agent_run(
tasks: list[str],
fleet_budget_usd: float = 5.0,
max_concurrency: int = 5,
max_depth: int = 3,
) -> list[str]:
semaphore = asyncio.Semaphore(max_concurrency)
budget = ConcurrentBudgetTracker(hard_limit_usd=fleet_budget_usd)
breaker = AsyncCircuitBreaker(failure_threshold=5, recovery_timeout=60.0)
async def _run_one(task: str, agent_id: str) -> str:
depth = _DEPTH.get()
if depth >= max_depth:
return f"[depth limit: {task[:60]}]"
estimated_cost = _estimate_cost(task)
if not await budget.can_spend(agent_id, estimated_cost):
return f"[budget limit reached for {agent_id}]"
token = _DEPTH.set(depth + 1)
try:
async with semaphore:
async with anthropic.AsyncAnthropic() as client:
result = await breaker.call(
client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": task}],
)
)
actual_cost = _actual_cost(result)
await budget.record_spend(agent_id, actual_cost)
return result.content[0].text
except CircuitBreakerOpenError:
return f"[circuit open — {task[:60]}]"
finally:
_DEPTH.reset(token)
return await asyncio.gather(
*[_run_one(t, f"agent-{i}") for i, t in enumerate(tasks)],
return_exceptions=True,
)
def _estimate_cost(task: str) -> float:
tokens = len(task.split()) * 1.3
return (tokens / 1_000_000) * 3.0 # claude-sonnet input rate
def _actual_cost(response: anthropic.types.Message) -> float:
return (
response.usage.input_tokens * 3.0 / 1_000_000
+ response.usage.output_tokens * 15.0 / 1_000_000
)
What the cost difference looks like in practice
| Scenario | Unguarded | With guards | Savings |
|---|---|---|---|
| 20 agents, 1 rate-limit event, 5 retry rounds Each agent retries 5× on 429; no jitter coordination | 100 extra calls | 0 extra calls | 100% |
| 3-level fan-out research agent, no depth cap 27 leaf agents on a 3×3×3 tree | 27 concurrent runs | Max 12 runs (depth 3, fan 4) | 55% |
| Fleet budget check, 20 agents at $0.50 each All 20 pass individual budget check; fleet is $10 | $10.00 exposure | Capped at fleet limit | configurable |
| Circuit breaker trip with 15 in-flight tasks Breaker opens on error #5; 15 tasks still running | 15 × (retries + subtasks) | 15 tasks cancelled at next await | ~90% |
Using RunGuard's async_guard() decorator
If you'd rather not wire the semaphore, budget tracker, depth limiter, and circuit breaker manually, RunGuard wraps this pattern into a single decorator. Install the Python SDK and wrap your agent coroutine:
pip install runguard
from runguard import async_guard
import anthropic
import os
os.environ["RUNGUARD_BUDGET_USD"] = "5.00"
os.environ["RUNGUARD_MAX_CONCURRENCY"] = "5"
os.environ["RUNGUARD_MAX_DEPTH"] = "3"
os.environ["RUNGUARD_ALERT_AT"] = "0.80" # alert at 80% of budget
@async_guard()
async def my_agent(query: str) -> str:
async with anthropic.AsyncAnthropic() as client:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": query}],
)
return response.content[0].text
# Works unchanged with asyncio.gather()
results = await asyncio.gather(*[my_agent(q) for q in queries])
The @async_guard() decorator picks up the shared semaphore and budget tracker from a module-level registry — all coroutines decorated with @async_guard() in the same process share the same fleet controls. The circuit breaker state is also shared, so a trip triggered by one agent suspends the entire fleet rather than just that one coroutine.
Slack and PagerDuty alerts fire at RUNGUARD_ALERT_AT (default 80% of budget) so you get a warning before the hard stop, not after. The trip log is written to ~/.runguard/trips.jsonl by default and queryable via runguard status.
Five async agent patterns to audit right now
If you already have concurrent agents running in production, audit for these before the next unexpected bill:
- Any
asyncio.gather()call with no semaphore. Count the maximum tasks that can run simultaneously. Multiply by your per-agent worst-case cost. That is your per-event budget exposure. - Any synchronous SDK client call inside an async function. It blocks the event loop during the HTTP call, prevents cooperative cancellation, and serializes unintentionally — the worst of both worlds. Switch to the async client.
- Any retry loop with
random.random()jitter but no global coordination. Ten tasks sleeping "2 + random(0,1) seconds" all wake within a 1-second window. Add a shared semaphore at the gate instead of jitter inside each task. - Any recursive agent call with no
ContextVardepth limit. If the sub-agent can call the same tool that spawned it, you have unbounded fan-out. Add_DEPTH.get()check as the first line of every recursive entry point. - Any per-session budget check that doesn't account for concurrent sessions. Replace individual budget objects with a shared
ConcurrentBudgetTrackerwith a fleet-level hard cap.
Frequently asked questions
Does asyncio.Semaphore work correctly across multiple Python processes?
No. asyncio.Semaphore is process-local. If you run agents in multiple worker processes (via multiprocessing or Gunicorn/Uvicorn workers), each process maintains its own semaphore with its own counter. Cross-process rate limiting requires an external coordinator — Redis with SETNX/INCR, or a database advisory lock. For single-process async concurrency (the common case with one asyncio event loop), a module-level asyncio.Semaphore works correctly.
What's the right semaphore size for my Anthropic tier?
Start with half the requests-per-minute limit divided by your typical calls-per-agent. If you're on Anthropic Tier 2 (50 RPM for Claude Sonnet) and each agent makes about 5 calls, a semaphore of 5 keeps you at 25 RPM at maximum throughput — comfortably under the limit with headroom for burst. Monitor your 429 rate in production; if it's zero, you can raise the semaphore. If it's non-zero, lower it or add explicit retry-after handling.
My agents use LangChain / LangGraph — does this apply?
Yes. LangChain's async agent runner uses asyncio.gather() internally for parallel tool calls and sub-chain execution. The same fan-out and retry-storm risks apply. You can wrap LangChain's arun() or ainvoke() calls with a shared semaphore and a budget tracker using the same pattern. See our SEO guide on LangChain circuit breakers for LangChain-specific integration points.
Can I use threading.Semaphore instead of asyncio.Semaphore if I'm mixing sync and async code?
Only if you use loop.run_in_executor() to run the synchronous sections. Acquiring a threading.Semaphore directly inside a coroutine (with a blocking acquire() call) blocks the entire event loop thread, preventing any other coroutine from making progress. Use asyncio.Semaphore for async code, and pass it as a dependency into the async wrappers around any sync calls. If you need a semaphore that works across both sync threads and async coroutines in the same process, use asyncio.run_coroutine_threadsafe() to bridge the two worlds.
What happens to a cancelled task's API call — am I still charged?
If the HTTP request was already sent and the API received it, yes — you pay for that call even if the task that made it was cancelled before it read the response. The savings from cancellation come from preventing the next iteration: the retry, the tool call that was about to happen, the sub-agent that was about to be spawned. For long-running tool call sequences, this is typically 80–95% of the cost that would have otherwise accrued after the trip. The in-progress call's charge is unavoidable without network-layer cancellation, which most LLM APIs do not support.