CrewAI ships with a max_iter parameter and an allow_delegation flag. Teams configure a crew, set max_iter=15, wire up a few tools, and call it done. Then a researcher agent and a writer agent start delegating their uncertainty back and forth — each delegation round spawning a fresh agent run — and you're looking at a $60 task that billed $340 before the process exited cleanly.
The problem is that max_iter counts iterations per task, not delegation depth across agents. When an agent delegates, the delegated subtask starts its own iteration counter at zero. A crew where every agent can delegate to every other agent has no effective turn limit — it has N separate turn limits that don't communicate. In a Process.hierarchical setup with a manager LLM, add one more LLM call per task decomposition to the bill.
This post builds a production circuit breaker for CrewAI: crew-wide delegation tracking, per-run budget enforcement, tool-call pattern analysis, and a CLOSED/OPEN/HALF_OPEN state machine that plugs into CrewAI's step_callback hook. At the end you'll see how RunGuard's @guard() decorator wraps any CrewAI tool with one line and handles all four failure modes automatically.
What you'll build: A circuit breaker that tracks delegation chains across all agents in a crew, enforces a hard budget cap before the crew completes, detects tool-call storms inside a single agent's iteration, and fails closed with structured diagnostics — without forking CrewAI or replacing your existing agents.
Why the delegation model fails more expensively than a single agent
A single ReAct agent with max_iter=15 has a clear worst case: 15 LLM calls. A CrewAI crew of three agents with allow_delegation=True has no such bound. Delegation introduces three cost multipliers that max_iter cannot contain:
- Delegation spawns independent iteration budgets. When Agent A delegates to Agent B, the delegated task runs with its own fresh
max_itercounter. Agent A's counter pauses while Agent B's runs. A crew of three agents that delegates freely has three independent iteration budgets, and each delegation chain can exhaust all three before the primary task fails its limit. - Context inheritance through delegation messages. CrewAI passes the full task context — backstory, goal, prior tool results — to the receiving agent as part of the delegation instruction. If the delegated task triggers a re-delegation back to the original agent, the forwarded context grows: original context + B's reasoning + B's tool results + A's original context again. Two full cycles of a A→B→A delegation chain triples the token cost per LLM call.
- Manager LLM amplification in hierarchical mode. In
Process.hierarchical, a manager LLM decomposes the top-level goal into subtasks and assigns them to agents. If a subtask result is inadequate, the manager reassigns it — triggering a new agent run. The manager itself is an LLM call on every assignment and reassignment. A goal that the manager deems insufficiently resolved at three successive assignments costs three manager calls plus three full agent runs before the pipeline fails.
These multipliers mean a crew that stays within each agent's individual max_iter can still cost 10–30× more than a well-guarded equivalent, because the iteration counters never see each other.
The four failure modes max_iter misses
1. Delegation loop: two agents routing uncertainty to each other
The canonical expensive failure in multi-agent CrewAI setups. A researcher agent hits an ambiguous query and delegates to a writer agent for "better framing." The writer agent decides the framing question is a research question and delegates back. Each delegation round creates a new task with a fresh iteration budget, so max_iter never trips — both agents terminate their individual tasks cleanly. The loop trips the system budget, not either agent's turn limit.
Detection signal: the sequence of (delegating_agent_role, receiving_agent_role) pairs across the session contains a repeated pair within the last N delegation events. One delegation and one return is normal handoff behavior. The same pair repeating three times in a session is a loop. max_iter cannot see this because it tracks one agent's iterations, not the inter-agent delegation graph.
2. Tool retry storm: an agent calling a failing tool on every iteration
When a CrewAI agent's tool call fails — network timeout, API rate limit, malformed response — the agent logs the failure in its scratchpad and reconsiders on the next iteration. A tool that fails transiently tends to produce similar error messages on each failure, and the agent tends to re-invoke it with slightly varied arguments on each retry. With max_iter=15, a tool that fails on every call gets invoked 15 times before the task exits with a failure — 15 API calls where zero were productive.
Detection signal: the same tool name appearing in consecutive failed invocations across three or more iterations of the same task. The tool-call pattern is [tool_X_fail, tool_X_fail, tool_X_fail] — a cycle of length 1 in the tool-call signature stream. An iteration-counting cap doesn't distinguish between "agent tried ten different approaches" and "agent called the same broken tool ten times."
3. Hierarchical manager over-decomposition
In Process.hierarchical, the manager LLM is called once per task assignment. If the assigned agent produces output the manager judges insufficient, the manager re-assigns the task — either back to the same agent or to a different one. Each reassignment costs a manager LLM call plus a full new agent run. A goal that requires four reassignment cycles before the manager accepts the output costs 4 manager calls + 4 agent runs with up to max_iter iterations each. The max_iter cap applies to each individual agent run — it has no concept of "total work done on this goal."
Detection signal: more than N manager reassignments for the same original task ID within a session. Two reassignments is reasonable (initial run + one retry on a cleaner prompt). Four or more without task completion is the signal that the manager and the agent are in a goal-definition disagreement loop.
4. Shared memory context bloat
CrewAI's memory=True enables long-term, short-term, and entity memory backed by embeddings. In a malfunctioning crew that retries tasks repeatedly, the memory store accumulates failed attempts, contradictory tool results, and partially resolved entity states. Each new iteration pulls from this growing memory context — so the retrieved context that guides the agent's next step includes a growing history of prior failures. An agent on iteration 12 of a failing task is not just paying for 12 iterations; it's paying for an LLM call that processes a context window polluted with 11 prior failure narratives.
With memory=True, context window growth is not linear in iterations — it's super-linear, because retrieved memory chunks from prior iterations are additive. A budget cap that does not account for per-iteration token inflation will underestimate total cost by 2–4× on long-running failed crews.
Building the CrewAI circuit breaker
CrewAI exposes two callback hooks that give us the data we need: step_callback fires on each agent step (tool use or LLM reasoning), and task_callback fires on task completion. We attach a stateful CrewCostBreaker instance to both hooks, track the four failure signals in shared state, and open the breaker when any threshold is exceeded.
For cost tracking we use LangChain's get_openai_callback context manager, since CrewAI agents are built on LangChain LLM wrappers and all token usage flows through the same callback chain.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
from langchain_community.callbacks import get_openai_callback
class BreakerState(Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CrewRunState:
# delegation tracking
delegation_pairs: list[tuple[str, str]] = field(default_factory=list)
delegation_count: int = 0
# tool-call tracking per task
tool_calls_by_task: dict[str, list[str]] = field(
default_factory=lambda: defaultdict(list)
)
# manager reassignment tracking
manager_reassignments: dict[str, int] = field(
default_factory=lambda: defaultdict(int)
)
# cost tracking
total_cost_usd: float = 0.0
total_tokens: int = 0
step_costs: list[float] = field(default_factory=list)
# state machine
breaker_state: BreakerState = BreakerState.CLOSED
trip_reason: str | None = None
trip_at: float | None = None
half_open_probe_count: int = 0
# config
budget_usd: float = 5.0
max_delegation_cycles: int = 3
max_tool_retries: int = 3
max_manager_reassignments: int = 3
half_open_cooldown_secs: float = 60.0
cost_drift_ratio: float = 3.0
class CrewBreakerTripped(Exception):
def __init__(self, reason: str, state: CrewRunState):
self.reason = reason
self.state = state
super().__init__(f"CrewAI circuit breaker tripped: {reason}")
class CrewCostBreaker:
"""
Circuit breaker for CrewAI crews.
Attach to a Crew via step_callback and task_callback.
Tracks delegation loops, tool storms, manager over-decomposition,
and budget. Opens the breaker when any threshold is exceeded.
Usage:
breaker = CrewCostBreaker(budget_usd=10.0)
crew = Crew(
agents=[...],
tasks=[...],
step_callback=breaker.on_step,
task_callback=breaker.on_task_complete,
)
with breaker.run():
result = crew.kickoff()
"""
def __init__(
self,
budget_usd: float = 5.0,
max_delegation_cycles: int = 3,
max_tool_retries: int = 3,
max_manager_reassignments: int = 3,
half_open_cooldown_secs: float = 60.0,
cost_drift_ratio: float = 3.0,
on_trip: Callable[[str, CrewRunState], None] | None = None,
):
self._lock = threading.Lock()
self._on_trip = on_trip
self._state = CrewRunState(
budget_usd=budget_usd,
max_delegation_cycles=max_delegation_cycles,
max_tool_retries=max_tool_retries,
max_manager_reassignments=max_manager_reassignments,
half_open_cooldown_secs=half_open_cooldown_secs,
cost_drift_ratio=cost_drift_ratio,
)
# ------------------------------------------------------------------ #
# CrewAI callbacks
# ------------------------------------------------------------------ #
def on_step(self, step_output: Any) -> None:
"""Attach to Crew(step_callback=breaker.on_step)."""
with self._lock:
self._check_breaker_state()
self._record_step(step_output)
self._check_all_thresholds()
def on_task_complete(self, task_output: Any) -> None:
"""Attach to Crew(task_callback=breaker.on_task_complete)."""
with self._lock:
self._check_breaker_state()
self._record_task_completion(task_output)
# ------------------------------------------------------------------ #
# Context manager for cost tracking
# ------------------------------------------------------------------ #
@contextmanager
def run(self):
"""
Wraps crew.kickoff() to track OpenAI costs.
with breaker.run():
result = crew.kickoff()
"""
with get_openai_callback() as cb:
try:
yield
except CrewBreakerTripped:
raise
finally:
with self._lock:
self._state.total_cost_usd = cb.total_cost
self._state.total_tokens = cb.total_tokens
# ------------------------------------------------------------------ #
# Internal tracking
# ------------------------------------------------------------------ #
def _check_breaker_state(self) -> None:
s = self._state
if s.breaker_state == BreakerState.OPEN:
elapsed = time.monotonic() - (s.trip_at or 0.0)
if elapsed >= s.half_open_cooldown_secs:
s.breaker_state = BreakerState.HALF_OPEN
s.half_open_probe_count = 0
else:
raise CrewBreakerTripped(
f"breaker OPEN — tripped: {s.trip_reason}", s
)
if s.breaker_state == BreakerState.HALF_OPEN:
s.half_open_probe_count += 1
if s.half_open_probe_count > 2:
self._trip("half_open probe limit exceeded")
def _record_step(self, step_output: Any) -> None:
s = self._state
# Extract tool name and task id from the CrewAI AgentAction step
tool_name = getattr(step_output, "tool", None)
task_id = getattr(step_output, "task_id", "unknown")
delegating = getattr(step_output, "delegating_to", None)
delegated_from = getattr(step_output, "delegated_from", None)
# Track delegation pairs
if delegating and delegated_from:
pair = (delegated_from, delegating)
s.delegation_pairs.append(pair)
s.delegation_count += 1
# Track tool calls per task
if tool_name:
s.tool_calls_by_task[task_id].append(tool_name)
# Track per-step cost via rolling window for drift detection
# CrewAI step output may carry usage_metadata in some LLM adapters
step_cost = getattr(step_output, "cost_usd", None)
if step_cost is not None:
s.step_costs.append(float(step_cost))
def _record_task_completion(self, task_output: Any) -> None:
s = self._state
task_id = getattr(task_output, "task_id", None) or getattr(
task_output, "description", "unknown"
)
# Track manager reassignments (hierarchical process)
was_reassigned = getattr(task_output, "was_reassigned", False)
if was_reassigned:
s.manager_reassignments[task_id] += 1
def _check_all_thresholds(self) -> None:
self._check_budget()
self._check_delegation_loop()
self._check_tool_storm()
self._check_manager_reassignments()
self._check_cost_drift()
def _check_budget(self) -> None:
s = self._state
# We use the rolling LangChain callback total, sampled on each step
# by re-reading from the thread-local callback context if possible.
# Fallback: use sum of step_costs if available.
cost = s.total_cost_usd or sum(s.step_costs)
if cost >= s.budget_usd:
self._trip(
f"budget exceeded: ${cost:.4f} >= ${s.budget_usd:.2f}"
)
def _check_delegation_loop(self) -> None:
s = self._state
if len(s.delegation_pairs) < 2:
return
# Count occurrences of each pair in the recent window (last 10)
window = s.delegation_pairs[-10:]
pair_counts: dict[tuple, int] = defaultdict(int)
for pair in window:
pair_counts[pair] += 1
for pair, count in pair_counts.items():
if count >= s.max_delegation_cycles:
self._trip(
f"delegation loop: {pair[0]}→{pair[1]} repeated "
f"{count}× in last {len(window)} delegations"
)
def _check_tool_storm(self) -> None:
s = self._state
for task_id, calls in s.tool_calls_by_task.items():
if len(calls) < s.max_tool_retries:
continue
# Check the tail for a repeated tool name (cycle length 1)
tail = calls[-s.max_tool_retries :]
if len(set(tail)) == 1:
self._trip(
f"tool storm on task '{task_id}': "
f"'{tail[0]}' called {len(tail)}× consecutively"
)
def _check_manager_reassignments(self) -> None:
s = self._state
for task_id, count in s.manager_reassignments.items():
if count >= s.max_manager_reassignments:
self._trip(
f"manager over-decomposition: task '{task_id}' "
f"reassigned {count}× without completion"
)
def _check_cost_drift(self) -> None:
s = self._state
costs = s.step_costs
if len(costs) < 6:
return
# Rolling average of first half vs second half of recent steps
mid = len(costs) // 2
early_avg = sum(costs[:mid]) / mid
late_avg = sum(costs[mid:]) / (len(costs) - mid)
if early_avg > 0 and late_avg / early_avg >= s.cost_drift_ratio:
self._trip(
f"cost drift: per-step cost grew {late_avg/early_avg:.1f}× "
f"(early avg ${early_avg:.5f} → late avg ${late_avg:.5f})"
)
def _trip(self, reason: str) -> None:
s = self._state
s.breaker_state = BreakerState.OPEN
s.trip_reason = reason
s.trip_at = time.monotonic()
if self._on_trip:
try:
self._on_trip(reason, s)
except Exception:
pass
raise CrewBreakerTripped(reason, s)
@property
def state(self) -> CrewRunState:
return self._state
Wiring the breaker into a crew
The breaker attaches to a standard Crew definition without modifying your agent or task classes. The run() context manager wraps crew.kickoff() and patches the LangChain callback chain for cost tracking.
from crewai import Agent, Crew, Process, Task
from crewai_tools import SerperDevTool
from crew_cost_breaker import CrewBreakerTripped, CrewCostBreaker
def slack_alert(reason: str, state) -> None:
print(f"[ALERT] CrewAI breaker tripped: {reason}")
print(f" Cost so far: ${state.total_cost_usd:.4f}")
print(f" Delegations: {state.delegation_count}")
# Replace with actual Slack/PagerDuty webhook
breaker = CrewCostBreaker(
budget_usd=8.0,
max_delegation_cycles=3,
max_tool_retries=3,
max_manager_reassignments=3,
half_open_cooldown_secs=120.0,
on_trip=slack_alert,
)
researcher = Agent(
role="Senior Research Analyst",
goal="Find current market data and synthesize trends",
backstory="Expert at finding and validating market intelligence.",
tools=[SerperDevTool()],
allow_delegation=True, # delegation enabled — needs the breaker
verbose=True,
)
writer = Agent(
role="Content Strategist",
goal="Transform research into clear market reports",
backstory="Expert at structuring complex data into readable narratives.",
allow_delegation=True,
verbose=True,
)
research_task = Task(
description="Research the AI agent observability market in 2026: key players, pricing, growth metrics.",
expected_output="A structured summary with 5 key findings and citations.",
agent=researcher,
)
writing_task = Task(
description="Write a 500-word market brief from the research findings.",
expected_output="A polished market brief in Markdown format.",
agent=writer,
context=[research_task],
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
step_callback=breaker.on_step,
task_callback=breaker.on_task_complete,
verbose=2,
)
try:
with breaker.run():
result = crew.kickoff()
print(f"Crew completed. Cost: ${breaker.state.total_cost_usd:.4f}")
print(result)
except CrewBreakerTripped as e:
print(f"Crew halted by circuit breaker: {e.reason}")
print(f"Total delegations before trip: {e.state.delegation_count}")
print(f"Cost at trip: ${e.state.total_cost_usd:.4f}")
Handling the HALF_OPEN recovery state
Once the breaker opens, you usually want to attempt recovery after a cooldown period rather than fail permanently. The HALF_OPEN state lets a limited number of steps through as a probe — if those steps succeed without triggering any threshold, the breaker closes. If they trip again, the breaker opens with an exponential backoff on the cooldown.
import time
class CrewCostBreakerWithBackoff(CrewCostBreaker):
"""Extends CrewCostBreaker with exponential backoff on repeated trips."""
def __init__(self, *args, backoff_base: float = 2.0, backoff_cap: float = 900.0, **kwargs):
super().__init__(*args, **kwargs)
self._trip_count = 0
self._backoff_base = backoff_base
self._backoff_cap = backoff_cap
def _trip(self, reason: str) -> None:
self._trip_count += 1
cooldown = min(
self._state.half_open_cooldown_secs * (self._backoff_base ** (self._trip_count - 1)),
self._backoff_cap,
)
self._state.half_open_cooldown_secs = cooldown
super()._trip(reason)
def reset(self) -> None:
"""Call after a successful crew run to reset the trip counter."""
self._trip_count = 0
self._state.breaker_state = BreakerState.CLOSED
self._state.trip_reason = None
self._state.half_open_cooldown_secs = 60.0
Cost savings in practice
The four trip conditions each prevent a different category of runaway spend. Based on the failure modes described above, here are conservative estimates for each scenario:
| Failure mode | Without breaker | With breaker | Savings |
|---|---|---|---|
| Delegation loop A→B→A repeated 8×, each with max_iter=10 |
~$14.20 | ~$1.80 (trip at cycle 3) | 87% |
| Tool retry storm Rate-limited API called 12× across iterations |
~$3.60 | ~$0.90 (trip at retry 3) | 75% |
| Manager over-decomposition Hierarchical: 6 reassignments, max_iter=15 each |
~$22.40 | ~$5.60 (trip at reassignment 3) | 75% |
| Memory context drift memory=True, 15 iterations, cost 4× inflated by iteration 10 |
~$9.80 | ~$2.45 (cost drift at iteration 6) | 75% |
The delegation loop scenario has the highest absolute savings because the combination of independent iteration budgets plus context inheritance makes each cycle progressively more expensive. The breaker trips at the third occurrence of the same delegation pair — before the compound cost effect takes hold.
RunGuard integration
The circuit breaker above works standalone. If you're running multiple crews across your stack, RunGuard's @guard() decorator provides the same loop detection, budget tracking, and alert dispatch as a single import that wraps any CrewAI tool function:
from runguard import guard
from crewai_tools import BaseTool
class MyResearchTool(BaseTool):
name: str = "market_data_fetcher"
description: str = "Fetches current market data for a given sector."
@guard(
budget_usd=2.0,
max_loop_repeats=3,
on_trip=lambda err: send_slack_alert(str(err)),
)
def _run(self, sector: str) -> str:
return fetch_market_data(sector) # your implementation
The decorator watches the tool's call-signature stream for loop patterns, enforces the per-tool budget cap, and raises on the third repeat — before the agent's iteration counter has a chance to exhaust. It works identically for both the standalone breaker pattern above and for individual tools in a crew that doesn't use the full breaker.
FAQ
Does step_callback work with both Process.sequential and Process.hierarchical?
Yes. step_callback and task_callback are Crew-level hooks that fire regardless of process type. In hierarchical mode, the manager LLM's reasoning steps also pass through step_callback, so you can detect the manager's reassignment decisions by checking for the manager agent's role in the step output. The manager_reassignments tracking in the example above relies on was_reassigned being set in the task output — which requires a small hook into the manager's callback or a subclassed HierarchicalProcess. A simpler alternative: track consecutive task completions with the same description and no state change between them.
How do I set the right budget_usd threshold without knowing my crew's normal cost?
Run the crew with the breaker in monitoring mode for 10–20 successful runs: collect breaker.state.total_cost_usd after each run, compute the 95th percentile, and set budget_usd to 2–3× that value. A common mistake is setting the cap too tight based on the average — crews have naturally high variance per run depending on input complexity. The goal is to catch runaway behavior (typically 5–10× normal cost), not to cap normal variance. If your crew normally costs $0.40–$1.20, a cap of $3.00 is usually the right first setting.
What happens if a legitimate workflow requires the same two agents to collaborate multiple times?
Increase max_delegation_cycles or scope the delegation check to a time window rather than the full session. A researcher and writer exchanging drafts three times in one session is a deliberate multi-round collaboration; them exchanging the same unresolved query three times in thirty seconds is a loop. You can differentiate by checking whether the delegated task description changes between cycles — if Agent A is asking Agent B the same question three times with identical wording, that is the signal to trip, regardless of whether the pair is "expected" to collaborate. Modify _check_delegation_loop to hash the task description alongside the agent pair.
Does this work with CrewAI's @agent and @task decorator-based crew definition (CrewBase)?
Yes. The step_callback and task_callback are passed to the Crew constructor, not to individual agents or tasks. In a CrewBase-decorated class, pass them in the @crew-decorated method where you construct the Crew object: return Crew(..., step_callback=breaker.on_step, task_callback=breaker.on_task_complete). The breaker instance should be created at class initialization so its state persists across the full crew run.
How does this interact with CrewAI's built-in memory and caching?
The circuit breaker is orthogonal to memory and caching — it observes the token/cost stream without modifying it. Memory with memory=True can actually make the breaker more accurate: because memory causes context windows to grow with iterations, per-step costs increase monotonically during a malfunction, which makes the cost-drift check (cost_drift_ratio) more sensitive. Caching (cache=True on tools) reduces tool costs but does not affect LLM reasoning costs, so the delegation and manager checks remain fully relevant even with caching enabled.