CrewAI Flows Cost Control: Loop Detection in Event-Driven AI Workflows
CrewAI's basic Crew model — agents, tasks, delegation chains — has a recognizable failure mode: agents route uncertainty to each other in a delegation loop. The max_iter cap exists precisely for this reason, and the CrewAI delegation model circuit breaker covers how to build guards that catch the four failure modes max_iter misses.
CrewAI Flows is a different programming model. Instead of a manager agent orchestrating worker agents through delegated tasks, Flows are event-driven state machines: methods decorated with @start(), @listen(), and @router() fire in response to state transitions. The same concept — "an AI step calling another AI step" — produces entirely different cost failure patterns when expressed as event listeners rather than delegation chains.
This post covers four failure modes that are unique to the Flows model and invisible to the delegation-layer guards. If your CrewAI application uses Flow as a base class and @listen or @router decorators, these failure modes apply to you directly.
Scope. Code examples target CrewAI 0.80+ with crewai[flows]>=0.80. The Flow, FlowState, @start, @listen, and @router APIs are stable as of this writing. For the basic Crew delegation model (no Flow base class), see CrewAI Cost Control: Loop Detection and Budget Enforcement in Production. For multi-agent systems using AutoGen's conversation model, see AutoGen Cost Control.
Why Flows fail differently than Crews
A basic Crew execution is linear: manager breaks work into tasks, assigns each to an agent, collects results. The control flow is a DAG from task assignment to completion. Cycles require explicit re-delegation — an agent actively choosing to send work back to a prior agent. This is observable as a delegation event.
A Flow execution is reactive: steps fire when events are emitted. A @listen(step_a) method fires whenever step_a completes. If @listen(step_a) produces an output that causes step_a to fire again — through a state change, a .kickoff() call, or a method chain — the cycle is implicit in the event graph, not in explicit re-delegation. No agent is "choosing" to loop. The loop emerges from the listener topology.
This distinction matters for cost analysis because:
- No iteration counter tracks cross-listener cycles.
max_iterlimits tool iterations within a single Crew agent's task execution. It does not track how many times a specific@listenmethod has fired across the lifetime of a Flow run. - Parallel listeners multiply costs without any loop pattern. Three listeners registered on the same source event fire simultaneously on every invocation — not because of a loop, but because the listener topology is a fan-out.
- FlowState is passed to every step. The Pydantic model representing shared state is serialized and available to every step method. If that model grows unbounded lists across steps, every downstream LLM call receives an enlarged context.
The four failure modes below each have a distinct root cause and a distinct guard implementation.
The four failure modes
1. Listener chain cycle
The most severe failure mode: two or more @listen methods trigger each other, forming an event cycle. Here is the minimal pattern:
from crewai.flow.flow import Flow, listen, start, router
from crewai.flow.flow import FlowState
from pydantic import BaseModel
class ResearchState(FlowState):
query: str = ""
result: str = ""
needs_refinement: bool = False
class ResearchFlow(Flow[ResearchState]):
@start()
def generate_query(self):
# Calls LLM to produce search query from user intent
self.state.query = llm_call(f"Generate a search query for: {self.state.query}")
@listen(generate_query)
def evaluate_query(self):
# Calls LLM to evaluate whether query is good enough
score = llm_call(f"Score this query 1-10: {self.state.query}")
if int(score) < 7:
self.state.needs_refinement = True
@listen(evaluate_query)
def refine_query(self):
if self.state.needs_refinement:
# Calls LLM to refine — then emits to generate_query again
self.state.query = llm_call(f"Refine this query: {self.state.query}")
self.state.needs_refinement = False
# Explicitly restarts the flow from the beginning
self.generate_query() # <-- re-enters the listener chain
The cycle: generate_query → evaluate_query → refine_query → generate_query. If the LLM consistently scores queries below 7 (which happens when the model is uncertain, when the prompt is ambiguous, or when the quality rubric is defined in a way the model can't satisfy), this cycle runs indefinitely. Three LLM calls per cycle, no natural stopping condition.
The same cycle pattern appears without an explicit self.method() call when using self.kickoff() inside a listener, or when a listener modifies state in a way that triggers a @router to re-enter the same branch. The explicit call makes the cycle visible; the state-driven variants are harder to spot in code review.
2. Router infinite re-routing
The @router decorator dispatches control to different listeners based on a returned string key. This is the idiomatic way to implement retry or validation logic in Flows. The failure mode occurs when the LLM output never satisfies the exit condition:
class ValidationState(FlowState):
draft: str = ""
attempts: int = 0
class DocumentFlow(Flow[ValidationState]):
@start()
def generate_draft(self):
self.state.draft = llm_call("Write a 500-word technical summary of...")
self.state.attempts += 1
@router(generate_draft)
def validate_draft(self):
# Calls a separate LLM to validate the draft
result = llm_call(f"Is this draft publication-ready? Reply PASS or FAIL.\n\n{self.state.draft}")
if "PASS" in result:
return "approved"
return "needs_revision"
@listen("needs_revision")
def revise_draft(self):
self.state.draft = llm_call(f"Improve this draft: {self.state.draft}")
# Emits to validate_draft again — same cycle
self.generate_draft() # or implicit via state change triggering @router
If the validator LLM always returns FAIL — because the draft quality standard is impossible to meet, because the validator prompt is flipped, or because the validator model is more critical than the generator model in a way that creates a permanent gap — the cycle runs indefinitely. Two LLM calls per cycle (generator + validator). No termination.
Note that self.state.attempts increments correctly but nothing reads it. This is the pattern most teams ship in the first iteration: a counter that accumulates but no guard that acts on it.
3. Parallel listener fan-out
Multiple @listen methods registered on the same source fire simultaneously. This is often intentional — you want to run analysis, logging, and post-processing in parallel. The cost failure occurs when the parallel listeners each make LLM calls, and the source event fires on every step of a loop:
class AnalysisState(FlowState):
document: str = ""
summary: str = ""
sentiment: str = ""
entities: list[str] = []
class AnalysisFlow(Flow[AnalysisState]):
@start()
def load_document(self):
self.state.document = fetch_document()
@listen(load_document)
def summarize(self):
# LLM call #1
self.state.summary = llm_call(f"Summarize: {self.state.document}")
@listen(load_document)
def analyze_sentiment(self):
# LLM call #2 — fires simultaneously with summarize
self.state.sentiment = llm_call(f"Sentiment: {self.state.document}")
@listen(load_document)
def extract_entities(self):
# LLM call #3 — fires simultaneously
self.state.entities = parse_list(llm_call(f"Extract entities: {self.state.document}"))
Three listeners, three LLM calls per load_document invocation — this is by design and expected. The cost multiplier becomes a failure mode when the document is large (each call pays full document token cost), when the source event fires repeatedly (inside a validation loop), or when the listeners themselves have parallel sub-listeners.
A three-level fan-out tree — 3 listeners on source, each with 3 listeners — fires 9 LLM calls on a single source event. Combined with a validation loop that re-triggers the source, cost scales as retries × fan_out_degree.
4. FlowState accumulation
FlowState is a Pydantic model shared across all steps. Steps read and write to self.state. The failure mode: unbounded list or dict fields in the state model accumulate context across steps, and that accumulated context is passed to every LLM call:
class PipelineState(FlowState):
messages: list[dict] = [] # grows with every step
search_results: list[str] = [] # accumulates all searches
analysis_history: list[str] = [] # stores every analysis attempt
current_step: str = ""
class PipelineFlow(Flow[PipelineState]):
@listen(prior_step)
def analyze(self):
# Appends to history on every call
self.state.analysis_history.append(self.state.current_step)
# Builds context from full history — grows with every iteration
context = "\n".join(self.state.analysis_history)
prompt = f"Given this history:\n{context}\n\nAnalyze: {self.state.current_step}"
result = llm_call(prompt) # Token cost = O(history_length × avg_step_size)
self.state.messages.append({"role": "assistant", "content": result})
At step 1, analysis_history has 1 entry. At step 10, it has 10 entries and the prompt includes all prior analysis. At step 50, every LLM call is 50× more expensive than step 1 in prompt tokens, even if the model's actual output doesn't change. This is identical in mechanics to the ConversationBufferMemory explosion in LCEL chains — the difference is that FlowState makes it very easy to build this pattern accidentally because shared state is idiomatic.
Building the Flows circuit breaker
The guard system for Flows has three components: a step invocation tracker (catches cycles), a router call tracker (catches infinite re-routing), and a state token estimator (catches accumulation). They combine into a GuardedFlow base class that replaces Flow as your parent class.
Step invocation tracker
import time
import threading
from collections import defaultdict
from functools import wraps
from typing import Any, Callable
class StepBudgetExceeded(RuntimeError):
"""Raised when a Flow step fires more times than its budget allows."""
pass
class FlowStepTracker:
"""
Thread-safe per-run step invocation counter.
Each Flow.kickoff() gets a fresh tracker.
"""
def __init__(self, max_invocations_per_step: int = 5):
self.max_invocations = max_invocations_per_step
self._counts: dict[str, int] = defaultdict(int)
self._lock = threading.Lock()
def record(self, step_name: str) -> None:
with self._lock:
self._counts[step_name] += 1
count = self._counts[step_name]
if count > self.max_invocations:
raise StepBudgetExceeded(
f"Step '{step_name}' has fired {count} times "
f"(limit {self.max_invocations}). "
f"Listener chain cycle detected. Full counts: {dict(self._counts)}"
)
def count(self, step_name: str) -> int:
with self._lock:
return self._counts[step_name]
Router call tracker
class RouterLoopError(RuntimeError):
"""Raised when a @router returns the same non-terminal route consecutively."""
pass
class RouterCallTracker:
"""
Tracks consecutive identical route returns from a @router method.
Raises after max_consecutive_same_route repetitions.
"""
def __init__(
self,
max_consecutive_same_route: int = 4,
terminal_routes: set[str] | None = None
):
self.max_consecutive = max_consecutive_same_route
self.terminal_routes = terminal_routes or {"approved", "done", "complete", "success"}
self._router_history: dict[str, list[str]] = defaultdict(list)
self._lock = threading.Lock()
def record(self, router_name: str, route: str) -> None:
with self._lock:
history = self._router_history[router_name]
history.append(route)
consecutive = self._consecutive_tail(history, route)
if route not in self.terminal_routes and consecutive > self.max_consecutive:
raise RouterLoopError(
f"Router '{router_name}' has returned route '{route}' "
f"{consecutive} consecutive times (limit {self.max_consecutive}). "
f"Route history: {history[-10:]}"
)
def _consecutive_tail(self, history: list[str], current: str) -> int:
count = 0
for r in reversed(history):
if r == current:
count += 1
else:
break
return count
FlowState token ceiling
class FlowStateBudgetError(RuntimeError):
"""Raised when FlowState has grown large enough to exceed the per-call token budget."""
pass
def estimate_state_tokens(state_obj: Any) -> int:
"""
Rough token estimate for a Pydantic FlowState.
Uses the char/4 heuristic across all string fields.
"""
import json
try:
raw = json.dumps(state_obj.model_dump(), default=str)
except Exception:
raw = str(state_obj)
return len(raw) // 4
class FlowStateBudget:
"""
Per-step ceiling on the token size of FlowState.
Prevents unbounded accumulation from making late-stage LLM calls expensive.
"""
def __init__(self, max_state_tokens: int = 8_000):
self.max_state_tokens = max_state_tokens
def check(self, state_obj: Any, step_name: str) -> None:
tokens = estimate_state_tokens(state_obj)
if tokens > self.max_state_tokens:
raise FlowStateBudgetError(
f"FlowState has grown to ~{tokens} tokens before step '{step_name}' "
f"(ceiling {self.max_state_tokens}). "
f"Unbounded list or history field detected. Trim state before continuing."
)
Combining into GuardedFlow
from crewai.flow.flow import Flow
def guarded_step(
max_invocations: int = 5,
check_state_tokens: bool = True,
is_router: bool = False,
terminal_routes: set[str] | None = None,
):
"""
Decorator for Flow step methods. Apply before @listen or @router.
Usage:
@listen(prior_step)
@guarded_step(max_invocations=3)
def my_step(self):
...
"""
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(self_flow: "GuardedFlow", *args, **kwargs):
step_name = fn.__name__
# Check step invocation budget
self_flow._step_tracker.record(step_name)
# Check FlowState token ceiling if enabled
if check_state_tokens and hasattr(self_flow, "state"):
self_flow._state_budget.check(self_flow.state, step_name)
result = fn(self_flow, *args, **kwargs)
# For router methods, track the returned route
if is_router and isinstance(result, str):
self_flow._router_tracker.record(
step_name, result,
)
return result
return wrapper
return decorator
class GuardedFlow(Flow):
"""
Drop-in base class replacing crewai.flow.flow.Flow.
Adds per-step invocation limits, router loop detection,
and FlowState accumulation protection.
Configure at class level:
MAX_STEP_INVOCATIONS = 5 # per step per kickoff()
MAX_CONSECUTIVE_SAME_ROUTE = 4 # before RouterLoopError
MAX_STATE_TOKENS = 8_000 # before FlowStateBudgetError
TERMINAL_ROUTES = {"approved", "done", "complete"}
"""
MAX_STEP_INVOCATIONS: int = 5
MAX_CONSECUTIVE_SAME_ROUTE: int = 4
MAX_STATE_TOKENS: int = 8_000
TERMINAL_ROUTES: set[str] = {"approved", "done", "complete", "success"}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._step_tracker = FlowStepTracker(self.MAX_STEP_INVOCATIONS)
self._router_tracker = RouterCallTracker(
max_consecutive_same_route=self.MAX_CONSECUTIVE_SAME_ROUTE,
terminal_routes=self.TERMINAL_ROUTES,
)
self._state_budget = FlowStateBudget(self.MAX_STATE_TOKENS)
self._run_start: float = time.time()
self._total_llm_calls: int = 0
self._total_llm_tokens: int = 0
def _record_llm_call(self, tokens_used: int = 0) -> None:
self._total_llm_calls += 1
self._total_llm_tokens += tokens_used
@property
def run_summary(self) -> dict:
return {
"elapsed_seconds": round(time.time() - self._run_start, 2),
"total_llm_calls": self._total_llm_calls,
"total_llm_tokens": self._total_llm_tokens,
"step_counts": dict(self._step_tracker._counts),
}
Applying the guards to the failure mode examples
Fixing the listener chain cycle
class SafeResearchFlow(GuardedFlow[ResearchState]):
MAX_STEP_INVOCATIONS = 3 # generate_query + evaluate + refine = at most 3 cycles
@start()
@guarded_step(max_invocations=3)
def generate_query(self):
self.state.query = llm_call(f"Generate a search query for: {self.state.query}")
self._record_llm_call()
@listen(generate_query)
@guarded_step(max_invocations=3)
def evaluate_query(self):
score = llm_call(f"Score this query 1-10: {self.state.query}")
self._record_llm_call()
self.state.needs_refinement = int(score) < 7
@listen(evaluate_query)
@guarded_step(max_invocations=3)
def refine_query(self):
if self.state.needs_refinement:
self.state.query = llm_call(f"Refine this query: {self.state.query}")
self._record_llm_call()
self.state.needs_refinement = False
try:
self.generate_query()
except StepBudgetExceeded:
# Budget hit — accept best query we have
pass
# Graceful top-level handler
try:
flow = SafeResearchFlow()
flow.kickoff(inputs={"query": "machine learning deployment cost"})
except StepBudgetExceeded as e:
print(f"[GUARD] Listener cycle stopped: {e}")
print(f"[GUARD] Run summary: {flow.run_summary}")
The guard fires on the 4th invocation of generate_query — three refinement cycles are allowed, and the fourth attempt raises StepBudgetExceeded instead of making another LLM call. The flow catches it internally and accepts the best query produced so far. Maximum LLM calls: 3 cycles × 3 steps = 9 instead of unbounded.
Fixing router infinite re-routing
class SafeDocumentFlow(GuardedFlow[ValidationState]):
MAX_CONSECUTIVE_SAME_ROUTE = 3
TERMINAL_ROUTES = {"approved"}
@start()
@guarded_step(max_invocations=5)
def generate_draft(self):
self.state.draft = llm_call("Write a 500-word technical summary...")
self.state.attempts += 1
self._record_llm_call()
@router(generate_draft)
@guarded_step(max_invocations=5, is_router=True)
def validate_draft(self):
result = llm_call(f"Is this draft publication-ready? Reply PASS or FAIL.\n\n{self.state.draft}")
self._record_llm_call()
if "PASS" in result:
return "approved"
return "needs_revision"
@listen("needs_revision")
@guarded_step(max_invocations=4)
def revise_draft(self):
self.state.draft = llm_call(f"Improve this draft: {self.state.draft}")
self._record_llm_call()
# After 3 revisions with no PASS, RouterLoopError fires above
self.generate_draft()
try:
flow = SafeDocumentFlow()
flow.kickoff()
except RouterLoopError as e:
print(f"[GUARD] Router loop stopped after {flow.state.attempts} attempts: {e}")
print(f"[GUARD] Best draft preserved in flow.state.draft")
After three consecutive needs_revision routes, RouterCallTracker raises RouterLoopError. The draft at that point is the best the generator produced — not the original, not nothing. Maximum LLM calls: 3 revisions × 2 calls (generate + validate) = 6 instead of unbounded.
Fixing parallel listener fan-out cost
The parallel listener pattern is often intentional; the guard addresses the cost-multiplication aspect rather than preventing parallelism entirely:
class SafeAnalysisFlow(GuardedFlow[AnalysisState]):
MAX_STEP_INVOCATIONS = 2 # Each listener may fire at most twice per run
# For large documents: truncate before passing to parallel listeners
DOCUMENT_TOKEN_LIMIT = 4_000
def _truncate(self, text: str, token_limit: int) -> str:
chars = token_limit * 4
if len(text) > chars:
return text[:chars] + "\n[... truncated for cost control ...]"
return text
@start()
def load_document(self):
self.state.document = fetch_document()
@listen(load_document)
@guarded_step(max_invocations=2)
def summarize(self):
doc = self._truncate(self.state.document, self.DOCUMENT_TOKEN_LIMIT)
self.state.summary = llm_call(f"Summarize: {doc}")
self._record_llm_call()
@listen(load_document)
@guarded_step(max_invocations=2)
def analyze_sentiment(self):
# Sentiment only needs the first ~500 tokens
doc = self._truncate(self.state.document, 500)
self.state.sentiment = llm_call(f"Sentiment (positive/negative/neutral): {doc}")
self._record_llm_call()
@listen(load_document)
@guarded_step(max_invocations=2)
def extract_entities(self):
doc = self._truncate(self.state.document, 2_000)
raw = llm_call(f"Extract named entities as a JSON array: {doc}")
self._record_llm_call()
self.state.entities = parse_json_list(raw)
The truncation per listener means each call pays for only as many tokens as it actually needs. Sentiment analysis doesn't need the full document — 500 tokens captures the opening paragraph, which typically determines sentiment. Entity extraction benefits from more context but doesn't need the full 50K-token document. Per-listener max_invocations=2 ensures that if something upstream re-triggers load_document, each listener fires at most twice.
Fixing FlowState accumulation
from typing import Optional
class BoundedPipelineState(FlowState):
document: str = ""
current_step: str = ""
# Bounded: keep only last N entries
analysis_history: list[str] = []
messages: list[dict] = []
MAX_HISTORY: int = 5
MAX_MESSAGES: int = 10
def append_analysis(self, entry: str) -> None:
self.analysis_history.append(entry)
if len(self.analysis_history) > self.MAX_HISTORY:
self.analysis_history = self.analysis_history[-self.MAX_HISTORY:]
def append_message(self, role: str, content: str) -> None:
self.messages.append({"role": role, "content": content})
if len(self.messages) > self.MAX_MESSAGES:
# Keep system message + most recent
self.messages = self.messages[:1] + self.messages[-(self.MAX_MESSAGES - 1):]
class SafePipelineFlow(GuardedFlow[BoundedPipelineState]):
MAX_STATE_TOKENS = 6_000 # Warn before state grows too large
@listen(prior_step)
@guarded_step(max_invocations=10, check_state_tokens=True)
def analyze(self):
# State token check fires before the LLM call
context = "\n".join(self.state.analysis_history)
result = llm_call(f"Given history:\n{context}\n\nAnalyze: {self.state.current_step}")
self._record_llm_call()
# Use bounded append — trims to MAX_HISTORY automatically
self.state.append_analysis(result)
self.state.append_message("assistant", result)
The FlowStateBudget.check() call inside @guarded_step fires before the LLM call. If accumulated state exceeds 6,000 tokens, FlowStateBudgetError is raised before the call is made. The bounded append methods on the state model ensure lists never grow past the configured limits — the state ceiling acts as a safety net for cases where the model implementation forgot to trim.
Wiring the complete guard into a production Flow
from crewai.flow.flow import listen, router, start
from crewai import LLM
class ProductionResearchFlow(GuardedFlow[ResearchState]):
"""
Production-hardened research flow with full circuit breaker coverage.
Swap GuardedFlow for Flow to disable guards for local development.
"""
MAX_STEP_INVOCATIONS = 4
MAX_CONSECUTIVE_SAME_ROUTE = 3
MAX_STATE_TOKENS = 10_000
TERMINAL_ROUTES = {"complete", "approved"}
def __init__(self):
super().__init__()
self._llm = LLM(model="gpt-4o-mini", temperature=0.3)
@start()
@guarded_step(max_invocations=4)
def plan_research(self):
result = self._llm.call([
{"role": "system", "content": "You are a research planner."},
{"role": "user", "content": f"Create a 3-step research plan for: {self.state.query}"}
])
self.state.plan = result.content
self._record_llm_call(tokens_used=result.usage.total_tokens)
@listen(plan_research)
@guarded_step(max_invocations=4)
def execute_research(self):
result = self._llm.call([
{"role": "system", "content": "Execute this research plan concisely."},
{"role": "user", "content": self.state.plan}
])
self.state.findings = result.content
self._record_llm_call(tokens_used=result.usage.total_tokens)
@router(execute_research)
@guarded_step(max_invocations=4, is_router=True)
def review_findings(self):
result = self._llm.call([
{"role": "user", "content": f"Are these findings complete and accurate? Reply APPROVE or REVISE.\n\n{self.state.findings}"}
])
self._record_llm_call(tokens_used=result.usage.total_tokens)
return "complete" if "APPROVE" in result.content else "needs_revision"
@listen("needs_revision")
@guarded_step(max_invocations=3)
def revise_findings(self):
result = self._llm.call([
{"role": "user", "content": f"Revise these findings to be more complete:\n\n{self.state.findings}"}
])
self.state.findings = result.content
self._record_llm_call(tokens_used=result.usage.total_tokens)
self.execute_research() # re-enter — guarded
@listen("complete")
def finalize(self):
self.state.result = self.state.findings
def run_flow(query: str) -> dict:
flow = ProductionResearchFlow()
try:
flow.kickoff(inputs={"query": query})
return {
"result": flow.state.result,
"summary": flow.run_summary,
}
except StepBudgetExceeded as e:
return {"error": f"Step budget exceeded: {e}", "partial": flow.state.findings, "summary": flow.run_summary}
except RouterLoopError as e:
return {"error": f"Router loop: {e}", "partial": flow.state.findings, "summary": flow.run_summary}
except FlowStateBudgetError as e:
return {"error": f"State too large: {e}", "summary": flow.run_summary}
Flows vs Crews: choosing the right guard layer
| Dimension | Basic Crew | Flow |
|---|---|---|
| Loop mechanism | Agent explicitly re-delegates to another agent | Event listener implicitly re-triggers source via state change |
| Built-in guard | max_iter per agent task |
None — no built-in step invocation limit |
| Parallel cost multiplier | Only with hierarchical Process.hierarchical |
Any two @listen methods on the same source |
| State growth | Agent memory grows; not passed to every call by default | FlowState passed to every step method explicitly |
| Retry pattern | Re-delegation or tool retry within task | @router returning same non-terminal route |
| Guard implementation | Step delegation counter + tool call hash | Step invocation counter + router route history + state token ceiling |
If your application uses both Crew inside Flow steps (a common pattern: Flows for overall pipeline orchestration, Crews for individual research or writing tasks within steps), apply both guard layers. The Crew-level guards protect within each step; the Flow-level guards protect the overall pipeline.
RunGuard integration
The GuardedFlow base class instruments the metrics RunGuard needs. Emit the run summary at kickoff completion:
import runguard
rg = runguard.Client(api_key="rg_...")
def run_flow_with_runguard(query: str) -> dict:
flow = ProductionResearchFlow()
try:
flow.kickoff(inputs={"query": query})
summary = flow.run_summary
rg.record_run(
app="research-pipeline",
status="ok",
llm_calls=summary["total_llm_calls"],
tokens=summary["total_llm_tokens"],
elapsed_seconds=summary["elapsed_seconds"],
metadata={"step_counts": summary["step_counts"]},
)
return {"result": flow.state.result, "summary": summary}
except (StepBudgetExceeded, RouterLoopError, FlowStateBudgetError) as e:
summary = flow.run_summary
rg.record_run(
app="research-pipeline",
status="tripped",
error=str(e),
llm_calls=summary["total_llm_calls"],
tokens=summary["total_llm_tokens"],
elapsed_seconds=summary["elapsed_seconds"],
)
return {"error": str(e), "summary": summary}
RunGuard's dashboard shows per-app breaker trip rates, step-level hot spots (which step fires most often before a trip), and token cost trends across runs. The step_counts metadata surfaces which listener is the source of a cycle — the step with the highest count is always the entry point of the loop.
FAQ
My Flow uses and_ and or_ to combine listener conditions. Do the guards still work?
Yes. and_(step_a, step_b) means the decorated method fires when both sources complete. The @guarded_step decorator wraps the method body regardless of the trigger condition — it records an invocation and checks the budget the same way. The only difference is that and_ conditions make cycles harder to form (both conditions must be satisfied), so the per-step limit can be set slightly lower for and_-triggered listeners.
Does GuardedFlow work with async Flows (async def step methods)?
The current @guarded_step decorator uses a synchronous wrapper. For async step methods, replace the @wraps(fn) wrapper body with an async def wrapper that awaits the inner function call. The tracker and budget check logic is synchronous (thread-safe via threading.Lock) and safe to call from within async context. A full async-compatible version of guarded_step swaps wrapper → async def wrapper(...) and result = fn(...)` → `result = await fn(...).
What's the recommended MAX_STEP_INVOCATIONS for a validation/retry Flow?
Set it to your intended retry budget plus one. If your flow is supposed to attempt at most 3 validation cycles, set MAX_STEP_INVOCATIONS = 4 — 3 real attempts plus 1 tolerance for the initial invocation before the first retry. For MAX_CONSECUTIVE_SAME_ROUTE on the @router, use the same number as your intended retry budget. The goal is to allow all legitimate retries while catching the infinite case, so intended_retries + 1 is the right ceiling for both.
Should I apply @guarded_step to every step, or only the ones I suspect are looping?
Apply it to every step that makes an LLM call or that is reachable from a @listen or @router that can be re-triggered. Steps that are only reachable once (e.g., @start() methods with no upstream listener) can be left unguarded if you're confident they're not in a cycle. In practice, applying to every step adds negligible overhead (a dict lookup and an integer increment) and provides a safety net against cycles introduced by future refactors.
How does RunGuard's SDK relate to the GuardedFlow base class?
The GuardedFlow class in this post is a drop-in guard you implement yourself — it uses only standard Python and CrewAI, with no external dependencies. RunGuard's SDK adds persistent observability: breaker trip events are stored in RunGuard's database, aggregated across all your Flow runs, and surfaced in the dashboard with per-step cost attribution. The pattern is the same: GuardedFlow catches the loop locally; RunGuard makes the trip data queryable and alertable across your entire agent fleet. Install RunGuard's CrewAI integration with pip install runguard[crewai] once you're ready to move from local guards to fleet-level observability.