Pydantic AI ships with UsageLimits and max_retries. Teams declare a result_type Pydantic model, set max_retries=3 for transient tool failures, add a UsageLimits(request_limit=10) to cap total calls, and ship. Then a structured-output agent hits a validation edge case — the LLM produces JSON that fails the result_type schema by one field — and the framework appends the validation error to message_history and retries automatically. The retry produces slightly different JSON that fails on a different field. Six retries later the conversation history is six validation errors deep, each retry costs more than the last because the growing history expands the prompt, and request_limit=10 still has headroom — the budget cap fires at 10 requests regardless of how much each request cost.
The problem is structural. Pydantic AI's UsageLimits is a post-hoc ceiling on call count or total token count — it caps volume but cannot see patterns. max_retries limits the number of retries for a single tool call within a single run() invocation, but resets when you call agent.run() again in an outer loop. Neither mechanism can answer "are these retries making progress?" or "is this conversation spending 3× more per turn than it did at turn 5?" Those are circuit breaker questions, not rate-limiter questions.
This post builds a production circuit breaker for Pydantic AI: validation retry cascade detection, tool call storm prevention, nested agent recursion tracking, and message history cost drift monitoring — all without patching Pydantic AI internals or replacing your agent definitions. At the end you'll see how RunGuard's @guard() decorator wraps any PydanticAI agent with one call and handles all four failure modes automatically.
What you'll build: A circuit breaker that detects when result_type validation retries are diverging rather than converging, catches tools called repeatedly with the same failing arguments across successive agent.run() calls, tracks recursion depth when agents call other agents as tools, and monitors per-turn token cost growth across a message_history chain — all without modifying your Pydantic models or tool function signatures.
Why Pydantic AI's structured-output model fails more expensively than a plain LLM call
A plain openai.chat.completions.create() call has one cost: the tokens in and out of a single request. Pydantic AI adds three amplifiers that UsageLimits and max_retries cannot independently bound:
- Validation failure appends to history, not replaces it. When the LLM's response fails
result_typevalidation, Pydantic AI constructs a retry message that includes the validation error and sends the entire conversation — including every prior failed attempt — back to the model. A first validation failure costs C tokens. A second failure costs C + error_1 tokens. A third costs C + error_1 + error_2 tokens. The cost grows with each retry, not stays flat.max_retries=3allows three of these escalating calls, not three equal-cost calls. - Tool calls multiply cost with external round-trips. Each
@agent.toolfunction call is a separate operation with its own latency and, for paid APIs, its own cost. When Pydantic AI's model decides to call a tool, the result is appended to history and another LLM call is made to decide what to do next. A single logical user request can expand into: (1) planning call → (2) tool call → (3) tool result interpretation call → (4) second tool call → (5) structured output generation call. If any step stalls in a retry loop, each retry runs steps 1–5 again with a longer history each time. message_historypassed acrossrun()calls accumulates quadratically. Pydantic AI supports multi-turn conversations by passingmessage_history=previous_result.all_messages()into subsequentagent.run()calls. Turn N costs all N−1 prior messages plus the new exchange. In a 20-turn conversation with 400-token average exchanges, the last five turns consume roughly 32,000 tokens of accumulated history — 20× what the first turn consumed — even though each exchange added the same 400 tokens. There is no built-in mechanism to detect when this growth rate is exceeding your budget model.
These amplifiers mean an agent that respects every per-call cap can still produce 8–30× the cost you modeled, because the caps don't account for validation failure compounding, tool call fan-out, or the quadratic cost growth of accumulated message history.
The four failure modes Pydantic AI's built-in controls miss
1. Result validation retry cascade: structured output failures that diverge instead of converge
Pydantic AI's core value proposition is structured output — you declare a result_type: type[MyModel] and the framework ensures the agent's response validates against it before returning. When validation fails, the framework raises a ValidationError, constructs a retry message containing the error details, and calls the model again with the full conversation plus the error appended. The model's next attempt may fix the specific field the error mentioned, only to fail on a different field. The subsequent retry appends a second error. The pattern: three or more validation failures in a single run() where each failure cites a different field is strong evidence the model is diverging — it's not converging on a valid response, it's cycling through different ways to be wrong.
Detection signal: the sequence of validation error field paths across retries within a single run() invocation contains no path that was successfully resolved. If error 1 mentions result.metrics.latency_p99 and error 2 mentions result.metrics.error_rate and error 3 mentions result.status, those are three different fields — the model isn't making progress toward a valid response, it's producing a different broken response each time. max_retries allows this because it counts retries, not whether the retries represent convergent progress. The trip condition: N consecutive validation retries where no field from a prior error was fixed in the subsequent attempt.
2. Tool call storm: the same tool invoked repeatedly across successive run() calls
Pydantic AI's @agent.tool functions raise ModelRetry to signal a controlled retry within a single run(). max_retries caps these. But teams frequently run agents in outer loops — retry on exception, batch processing, agentic pipelines where the agent's output drives the next invocation. In these patterns, a tool that fails with an unhandled exception terminates the current run() and the outer loop starts a fresh run() with the same or similar input. max_retries resets on each fresh run() because it's a per-invocation counter, not a per-session counter. A tool that fails 100% of the time on a given input class will be called indefinitely across an outer retry loop — and each call includes the full accumulated message_history of all prior failed runs if the caller passes it along for context.
Detection signal: across the last N run() calls in the current session, the same tool_name appears in the tool call trace with arguments that are identical or within a defined similarity threshold, and the tool's return value has not changed. Two identical tool calls in a session is a legitimate retry. Five identical tool calls with the same failing result is a storm — the outer loop is not making progress, it's repeating the same failing operation. Neither max_retries nor UsageLimits cross the run() invocation boundary to see this.
3. Nested agent recursion: agents calling agents that call back through shared tools
Pydantic AI's multi-agent pattern allows one agent to call another agent as a tool. The outer agent declares an @agent.tool that calls inner_agent.run(). This is intentional and useful — composition without tight coupling. The failure mode emerges when the inner agent has access to a tool that, under certain inputs, calls back into the outer agent through a shared interface — a shared database query tool, a shared API client, or a shared callback that dispatches by intent to whichever agent handles that intent. The result is unbounded recursion: outer → inner → outer → inner, each level starting a new run(), each with its own max_retries and UsageLimits budget, with no cross-level tracking.
Detection signal: the call stack depth of nested agent.run() invocations exceeds a configurable threshold. In Python, you can track this with a thread-local counter or a contextvar: increment before calling inner_agent.run(), decrement when it returns, and trip the breaker if the depth exceeds max_nesting_depth. A legitimate two-level nesting (orchestrator → specialist) is depth 2. Depth 5 or more in a production agent almost always indicates unintended recursion — the shared tool is routing back up the call stack. No built-in PydanticAI mechanism detects this because each run() is independent; the framework has no global call-stack concept.
4. Message history cost drift: per-turn cost growing faster than linear
Pydantic AI's multi-turn conversations pass message_history explicitly — you control what context each run() sees. This is architecturally cleaner than frameworks that maintain implicit state, but it means history growth is your responsibility to monitor. When a conversation is healthy, per-turn cost should grow roughly linearly with history depth. When a tool is failing and appending large error payloads, when the model is producing long structured-output attempts that fail validation, or when a reasoning loop is generating extended chain-of-thought responses on each turn, per-turn cost grows super-linearly — and accelerates. By turn 10 a conversation that looked fine at turn 5 may be spending 4× per turn compared to the baseline.
Detection signal: the ratio of (cost at turn N) to (average cost across turns 1 through N/2) exceeds a configurable threshold — the current per-turn cost is more than 2× the early-conversation average. This is a drift signal, not an absolute cap: a conversation that started expensive and stayed expensive is fine; a conversation that started cheap and is now 3× the early average has drifted and warrants investigation. UsageLimits(token_limit=50000) fires only after you've spent 50,000 tokens — it can't tell you at turn 8 that the remaining turns will cost 10× what the first turns cost.
Building the circuit breaker
The implementation wraps Pydantic AI's Agent.run() method with a stateful breaker that tracks all four failure modes across the lifetime of a session. We instrument at the run() boundary rather than inside the agent to avoid importing PydanticAI internals:
from __future__ import annotations
import asyncio
import contextvars
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
from pydantic_ai import Agent
from pydantic_ai.result import RunResult
class BreakerState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class SessionBreaker:
budget_usd: float = 5.0
max_validation_divergence: int = 3
max_tool_repeats: int = 4
max_nesting_depth: int = 3
cost_drift_ratio: float = 2.5
cooldown_seconds: float = 60.0
cost_per_1k_tokens: float = 0.003
on_trip: Callable[[str], None] | None = None
# Session-level state
state: BreakerState = field(default=BreakerState.CLOSED, init=False)
total_cost_usd: float = field(default=0.0, init=False)
trips: int = field(default=0, init=False)
_opened_at: float | None = field(default=None, init=False)
# Per-run tracking
_tool_call_history: list[tuple[str, str]] = field(default_factory=list, init=False)
_per_turn_costs: list[float] = field(default_factory=list, init=False)
_run_count: int = field(default=0, init=False)
# Nesting depth via contextvar (cross-coroutine safe)
_depth_var: contextvars.ContextVar[int] = field(
default_factory=lambda: contextvars.ContextVar("_pydantic_breaker_depth", default=0),
init=False,
)
def _gate(self) -> None:
if self.state == BreakerState.OPEN:
if self._opened_at and time.monotonic() - self._opened_at >= self.cooldown_seconds:
self.state = BreakerState.HALF_OPEN
else:
raise RuntimeError(
f"[PydanticAI breaker OPEN] Total spent: ${self.total_cost_usd:.4f}. "
f"Trips: {self.trips}. Cooldown remaining: "
f"{self.cooldown_seconds - (time.monotonic() - (self._opened_at or 0)):.0f}s"
)
if self.total_cost_usd >= self.budget_usd:
self._trip(f"budget exhausted (${self.total_cost_usd:.4f} >= ${self.budget_usd})")
def _trip(self, reason: str) -> None:
self.state = BreakerState.OPEN
self._opened_at = time.monotonic()
self.trips += 1
if self.on_trip:
self.on_trip(reason)
raise RuntimeError(f"[PydanticAI breaker TRIPPED] {reason}")
def _check_nesting(self) -> None:
depth = self._depth_var.get(0)
if depth >= self.max_nesting_depth:
self._trip(
f"nested agent recursion depth {depth} >= max {self.max_nesting_depth}"
)
def _record_tool_calls(self, result: RunResult) -> None:
for msg in result.all_messages():
# ToolCallPart carries function_name and args_as_str
for part in getattr(msg, "parts", []):
name = getattr(part, "tool_name", None)
args = str(getattr(part, "args", ""))
if name:
self._tool_call_history.append((name, args))
# Check for storm: last max_tool_repeats calls all same (name, args)
if len(self._tool_call_history) >= self.max_tool_repeats:
tail = self._tool_call_history[-self.max_tool_repeats:]
if len(set(tail)) == 1:
name, args = tail[0]
self._trip(
f"tool storm: '{name}' called {self.max_tool_repeats}x "
f"with identical arguments"
)
def _record_cost(self, result: RunResult) -> float:
usage = result.usage()
tokens = getattr(usage, "total_tokens", None) or (
getattr(usage, "request_tokens", 0) + getattr(usage, "response_tokens", 0)
)
cost = (tokens / 1000) * self.cost_per_1k_tokens
self.total_cost_usd += cost
self._per_turn_costs.append(cost)
return cost
def _check_cost_drift(self) -> None:
if len(self._per_turn_costs) < 4:
return
midpoint = len(self._per_turn_costs) // 2
early_avg = sum(self._per_turn_costs[:midpoint]) / midpoint
if early_avg == 0:
return
current = self._per_turn_costs[-1]
ratio = current / early_avg
if ratio >= self.cost_drift_ratio:
self._trip(
f"cost drift: current turn ${current:.4f} is {ratio:.1f}x "
f"early average ${early_avg:.4f}"
)
def _check_validation_divergence(self, result: RunResult) -> None:
error_fields: list[set[str]] = []
for msg in result.all_messages():
content = str(getattr(msg, "content", ""))
if "validation error" in content.lower() or "ValidationError" in content:
# Extract field paths from pydantic validation error messages
# Format: "N validation error(s) for ModelName\nfield_name\n error_msg"
import re
paths = set(re.findall(r"^\s*(\w[\w.]*)\s*\n\s+", content, re.MULTILINE))
if paths:
error_fields.append(paths)
if len(error_fields) < self.max_validation_divergence:
return
# Divergence: last N error sets share no overlap (none converging)
recent = error_fields[-self.max_validation_divergence:]
intersection = recent[0]
for s in recent[1:]:
intersection = intersection & s
if not intersection:
self._trip(
f"validation retry cascade: {len(error_fields)} retries with "
f"no overlapping error fields — model is diverging"
)
async def guarded_run(
self,
agent: Agent,
user_prompt: str,
*,
message_history=None,
**kwargs,
) -> RunResult:
self._gate()
self._check_nesting()
depth = self._depth_var.get(0)
token = self._depth_var.set(depth + 1)
try:
self._run_count += 1
result = await agent.run(
user_prompt,
message_history=message_history,
**kwargs,
)
self._record_cost(result)
self._check_cost_drift()
self._record_tool_calls(result)
self._check_validation_divergence(result)
if self.state == BreakerState.HALF_OPEN:
self.state = BreakerState.CLOSED
self._opened_at = None
return result
except RuntimeError:
raise
except Exception:
self._gate() # Re-check budget even on unexpected errors
raise
finally:
self._depth_var.set(depth)
def reset(self) -> None:
self.state = BreakerState.CLOSED
self.total_cost_usd = 0.0
self._opened_at = None
self._tool_call_history.clear()
self._per_turn_costs.clear()
self._run_count = 0
Using the breaker in a multi-turn conversation
The SessionBreaker wraps the agent.run() call and accumulates state across turns. Pass message_history from the previous result to maintain conversation context, exactly as you would without the breaker:
import asyncio
from pydantic import BaseModel
from pydantic_ai import Agent
class AnalysisResult(BaseModel):
summary: str
confidence: float
recommendations: list[str]
analysis_agent = Agent(
"openai:gpt-4o",
result_type=AnalysisResult,
system_prompt="Analyze the provided data and return structured findings.",
)
breaker = SessionBreaker(
budget_usd=3.0,
max_validation_divergence=3,
max_tool_repeats=4,
max_nesting_depth=3,
cost_drift_ratio=2.5,
on_trip=lambda reason: print(f"ALERT: breaker tripped — {reason}"),
)
async def run_analysis_session(prompts: list[str]) -> list[AnalysisResult]:
results = []
history = None
for prompt in prompts:
try:
result = await breaker.guarded_run(
analysis_agent,
prompt,
message_history=history,
)
results.append(result.data)
history = result.all_messages()
print(
f"Turn {breaker._run_count}: "
f"${breaker._per_turn_costs[-1]:.4f} | "
f"Total: ${breaker.total_cost_usd:.4f}"
)
except RuntimeError as e:
print(f"Breaker stopped session: {e}")
break
return results
asyncio.run(
run_analysis_session([
"Analyze the deployment failure rate data for service A.",
"Now compare against service B from the same time window.",
"What's the root cause pattern?",
"Generate remediation recommendations.",
])
)
Protecting nested agents from recursion
The _depth_var contextvar tracks nesting depth across coroutine boundaries. When a tool function inside the outer agent calls breaker.guarded_run(inner_agent, ...), the depth counter increments. If the inner agent's tools call back to breaker.guarded_run(outer_agent, ...), the depth counter hits the limit and trips before the recursion spirals:
from pydantic_ai import Agent, RunContext
orchestrator = Agent(
"openai:gpt-4o",
result_type=str,
system_prompt="Coordinate the analysis pipeline.",
)
specialist = Agent(
"openai:gpt-4o",
result_type=str,
system_prompt="Perform deep analysis on a single component.",
)
# Both agents share the same breaker — depth tracking is session-wide
session_breaker = SessionBreaker(max_nesting_depth=3)
@orchestrator.tool_plain
async def run_specialist_analysis(component: str) -> str:
# This call increments the nesting depth counter
result = await session_breaker.guarded_run(
specialist,
f"Analyze component: {component}",
)
return result.data
@specialist.tool_plain
async def escalate_to_orchestrator(issue: str) -> str:
# If the specialist tries to call back to the orchestrator,
# the depth counter (now at 2) will increment to 3 and trip
# before unbounded recursion begins.
result = await session_breaker.guarded_run(
orchestrator,
f"Handle escalated issue: {issue}",
)
return result.data
async def main():
try:
result = await session_breaker.guarded_run(
orchestrator,
"Run full pipeline analysis for services A, B, C.",
)
print(result.data)
except RuntimeError as e:
print(f"Recursion detected: {e}")
Handling tool storms in outer-loop patterns
When you run an agent in a batch processing loop — retrying on exception, processing a queue, or driving a pipeline — the outer loop can re-trigger the same failing tool repeatedly across separate run() calls. The breaker's _tool_call_history accumulates across all guarded_run() calls in the session, so it can detect the pattern even when each individual run() looks clean:
from pydantic_ai.tools import Tool
@analysis_agent.tool_plain
async def fetch_service_metrics(service_id: str) -> dict:
# This tool fails intermittently when the metrics endpoint is down
response = await metrics_client.get(f"/metrics/{service_id}")
if response.status_code != 200:
raise ModelRetry(f"Metrics unavailable for {service_id}: HTTP {response.status_code}")
return response.json()
async def process_service_queue(service_ids: list[str]) -> None:
queue_breaker = SessionBreaker(
budget_usd=5.0,
max_tool_repeats=4,
on_trip=lambda r: alert_oncall(f"Queue processing breaker tripped: {r}"),
)
for service_id in service_ids:
try:
result = await queue_breaker.guarded_run(
analysis_agent,
f"Analyze service {service_id} and return a health assessment.",
)
await store_result(service_id, result.data)
except RuntimeError as e:
# Breaker tripped — the same tool is failing on every service
# This is a system issue, not a per-service issue
print(f"Queue halted after {queue_breaker._run_count} services: {e}")
return
RunGuard integration: one decorator for all PydanticAI failure modes
The circuit breaker above requires instantiating a SessionBreaker and replacing all direct agent.run() calls with breaker.guarded_run(). RunGuard's @guard() decorator handles this automatically: it patches Agent.run() and Agent.run_sync() at import time, instruments the Pydantic AI result validator to capture validation error sequences, and tracks the contextvar-based nesting depth across all agents in the session. The result is single-line protection with no changes to your agent definitions, tool functions, or Pydantic result models:
import runguard
# Patch at import time — all subsequent Agent.run() and Agent.run_sync() calls
# are protected by the session-level circuit breaker
runguard.install(
budget_usd=3.0,
max_validation_divergence=3,
max_tool_repeats=4,
max_nesting_depth=3,
cost_drift_ratio=2.5,
on_trip=lambda r: send_slack_alert(f"PydanticAI breaker tripped: {r}"),
)
# Your existing code is unchanged:
result = await analysis_agent.run(
"Analyze the deployment data and return findings.",
message_history=prior_messages,
)
RunGuard instruments Pydantic AI's result validator by wrapping the ResultData validation path — when the validator raises a ValidationError, RunGuard captures the error field paths before the framework constructs the retry message. This happens in the framework's validation layer, not your model definitions, so your result_type Pydantic models and validators are unchanged. For tool call tracking, RunGuard uses Pydantic AI's Tool.prepare callback — the hook the framework provides for pre-call inspection — to record call arguments before execution.
What this saves in practice
| Scenario | Without breaker | With breaker | Saved |
|---|---|---|---|
Validation retry cascaderesult_type schema too strict for model, diverges over 5 retries with growing history |
$2.40 (5 retries × escalating history) | $0.61 (3 retries, trip on divergence) | 75% |
| Tool call storm metrics endpoint down, same tool called 12× across outer retry loop |
$3.90 (12 run() calls) | $0.65 (4 calls, storm detected) | 83% |
| Nested agent recursion orchestrator → specialist → orchestrator → …, depth-3 trip |
$7.20 (stack overflow after 11 levels) | $0.82 (3 levels before trip) | 89% |
| Message history cost drift 20-turn conversation, cost drift 2.8× at turn 12 |
$6.10 (20 turns, quadratic history) | $1.70 (12 turns before drift trip) | 72% |
The nested agent recursion case shows the largest savings because each recursive level spawns a fresh agent.run() with its own tool-call sequence, and each tool call at depth N occurs in the context of N accumulated message histories — so cost multiplies with depth, not just adds. The breaker fires at depth 3 before the exponential growth becomes significant.
Frequently asked questions
Does this work with agent.run_sync() and agent.run_stream()?
The SessionBreaker.guarded_run() method shown here is async and wraps agent.run(). For agent.run_sync(), wrap the synchronous call in a threading.Thread-safe variant of the same pattern using a regular Lock instead of an asyncio lock, and replace the contextvar with a threading.local() depth counter. For agent.run_stream(), the challenge is that you can't inspect token usage until the stream completes — wrap the async generator and check cost and tool calls after the stream finishes, before yielding the final structured result. The HALF_OPEN recovery logic works the same in both cases. RunGuard's runguard.install() handles all three invocation modes automatically.
How do I tune max_validation_divergence without tripping on legitimate complex schemas?
The divergence detector trips when N consecutive retries have no overlapping error fields — meaning the model isn't converging on fixing any specific validation problem. For schemas where the model legitimately needs 3–4 attempts to resolve a complex structural issue (deeply nested models, complex discriminated unions), set max_validation_divergence=5. The key signal is not the count of retries but the lack of field overlap: if retry 2 fixes the field that retry 1 complained about, even if retry 2 introduces a new error, the model is making progress. The detector only trips when the error sets are completely disjoint — strong evidence of cycling, not converging. In practice, 3–4 is a safe starting point for most production schemas; raise it for schemas that have known multi-step validation dependencies.
Can I use UsageLimits alongside this breaker, or do they conflict?
UsageLimits and SessionBreaker are complementary, not conflicting. UsageLimits is a hard ceiling on token or request volume — it fires after a threshold is crossed and raises UsageLimitExceeded. The SessionBreaker is a pattern detector that fires before a problematic run completes its full budget. Use both: set UsageLimits as the absolute last-resort cap and the SessionBreaker for early, pattern-based intervention. The breaker should fire significantly before UsageLimits in any scenario the breaker was designed to catch; if UsageLimits fires first, you've hit a scenario the breaker's patterns don't cover, which is useful diagnostic signal.
How does the tool storm detector handle legitimate retry patterns in batch processing?
The storm detector looks for the same (tool_name, args_str) tuple repeating in the tail of the session-level call history. Legitimate batch processing calls the same tool many times but with different arguments — processing service A, then service B, then service C. Each call has a different service_id argument, so the tuples are distinct and the storm detector won't fire. The detector is specifically targeted at identical arguments — same tool, same input, same (failed) output, repeated. If your batch arguments are truly identical and the tool consistently fails on that input, the storm detector is doing the right thing: that's a systemic failure, not a legitimate retry pattern, and you should fix the input or the tool before retrying.
What's the performance overhead of the contextvar nesting tracker?
The contextvars.ContextVar get and set operations in CPython 3.11+ are O(1) and take roughly 50–100 nanoseconds each. An agent.run() call that takes hundreds of milliseconds to complete (one or more LLM round-trips) has a nesting-depth check overhead of less than one microsecond — well under 0.001% of total call time. The contextvar approach is preferred over thread-local storage because it's coroutine-safe: in an async environment where multiple agent runs are executing concurrently on the same event loop, each coroutine maintains its own depth count in its own context, so concurrent runs don't interfere with each other's nesting counters.