Your Python AI agent calls a tool. The tool returns a result the model considers incomplete. The model calls the same tool again. Then again. Forty seconds later, you have 23 identical API calls on your bill and no output worth keeping. This happens in production, in every Python agent framework, at every scale — and the built-in safeguards were not designed to stop it.

This guide covers three Python-specific failure modes, two framework-native approaches that are less useful than they look, and a full circuit-breaker implementation with CLOSED, OPEN, and HALF_OPEN states that you can drop into any Python agent today.

The key distinction: max_iterations=20 stops the agent after 20 steps, whether they were all different or all identical. A circuit breaker stops the agent when it sees the same tool call repeated N times — which is almost always less than 20. You want the latter, because your bill arrives per-call, not per-run.

How Python agents fail in production

Not every loop looks the same. Python agent frameworks have three distinct failure modes, each with a different signature. Knowing which one you're seeing determines which guard to add.

1. Tool-call signature lock

The most common pattern: the model calls web_search("best practices for X"), receives a result, and concludes it needs more information. It calls web_search("best practices for X") again. Same function, same arguments, same result — but the model's confidence hasn't changed, so it retries. The loop signature is a sequence of identical (tool_name, args_hash) tuples.

This is especially common with retrieval tools (web search, vector DB lookup, read_file) where the model can't track that it has already tried a particular query. The result fills more context, the model becomes less decisive, and it retries hoping for a better answer. See how Python agents enter infinite loops for a deeper analysis of why this happens at the model level.

2. Context accumulation drift

As a long-running agent fills its context window, earlier tool results get truncated. The model loses visibility into what it has already executed. With prior steps gone, the model re-evaluates the original task — and re-calls the same tools it ran in the now-truncated section. This is the most expensive failure mode because it accelerates: each additional tool call adds tokens that push out more prior results, increasing the probability of re-execution.

Unlike signature-lock, context drift produces loops that look different across repetitions because the argument values may shift slightly as the model adjusts its query phrasing. Detecting this requires watching for semantic similarity rather than exact hash matches — or, more practically, capping the total token spend for a run regardless of step count.

3. Async retry collision

Python's async ecosystem adds a third failure mode that TypeScript agents rarely see: retry logic stacking. An async tool call raises httpx.TimeoutException. Your tool decorator retries with exponential backoff. Simultaneously, the LangChain callback sees a failed tool and invokes the model, which decides to call the same tool again. Two independent retry loops — your @retry decorator and the model's own retry behavior — compound into 4×N back-to-back calls before any circuit trips.

This is uniquely dangerous in Python because the asyncio event loop allows these competing retry sequences to interleave invisibly. The run log shows a wall of tool calls; nothing in the traceback names the retry collision as the cause.

Two native approaches — and where they fail

max_iterations in LangChain and CrewAI

Both LangChain's AgentExecutor(max_iterations=N) and CrewAI's max_iter parameter count steps and stop after N regardless of whether those steps were productive or repetitive. This is better than nothing, but it creates a calibration trap.

Set max_iterations=10 to catch tight loops, and you break legitimate research flows that need 15 sequential tool calls to complete. Set max_iterations=30 to allow those flows, and a 3-call loop burns 30 iterations before the guard fires. You end up choosing between false positives and expensive false negatives, with no mechanism to distinguish the two cases.

See how to add a circuit breaker to a LangChain agent for a detailed breakdown of how max_iterations interacts with pattern-based detection in chains with tool use.

asyncio.wait_for and threading.Event timeouts

Wall-clock timeouts are a useful backstop, but they don't distinguish a long legitimate run from a fast loop. A burst of 50 identical tool calls can complete and return in under 10 seconds — well within any practical timeout. And when the timeout does fire, it tells you nothing about which tool call caused the excess. Timeouts are forensic, not preventive.

The circuit-breaker state machine

A circuit breaker borrowed from distributed systems has three states:

State Behavior Transition
CLOSED (normal) Tool calls pass through. Signature counts increment. OPEN when any signature exceeds the loop threshold, or budget exceeds the cap.
OPEN (tripped) Tool calls are blocked immediately. CircuitOpenError raised before any API call goes out. HALF_OPEN after the recovery window (default 30s).
HALF_OPEN (probe) One probe call is allowed through. If it succeeds with a new signature, the breaker resets. CLOSED on probe success; → OPEN on probe failure or repeat signature.

The HALF_OPEN state is what separates a circuit breaker from a simple kill switch. After a configurable recovery window, the breaker allows one probe call to test whether the underlying condition has resolved. If the probe succeeds, the breaker resets and normal operation resumes. If the probe sees the same repeated signature, the breaker re-opens without consuming another full recovery window.

Full Python implementation

The following class implements all three states with per-session signature counting, budget enforcement, and thread-safe state transitions. It requires only the Python standard library — no external dependencies.

import hashlib
import json
import time
import threading
from collections import defaultdict
from enum import Enum
from typing import Any, Callable, Optional


class BreakerState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitOpenError(Exception):
    """Raised when a tool call is blocked by an open circuit breaker."""
    def __init__(self, reason: str, signature: str, trip_count: int):
        super().__init__(f"Circuit breaker open: {reason}")
        self.reason = reason      # "loop" | "budget" | "context"
        self.signature = signature
        self.trip_count = trip_count


class AgentCircuitBreaker:
    """
    Pattern-based circuit breaker for Python AI agents.

    Usage:
        breaker = AgentCircuitBreaker(loop_threshold=3, budget_limit_usd=5.00)

        # Before each tool call:
        breaker.check(tool_name, args, estimated_cost_usd)

        # After a clean run completes:
        breaker.reset()
    """

    def __init__(
        self,
        loop_threshold: int = 3,
        budget_limit_usd: float = 5.00,
        recovery_window_seconds: float = 30.0,
        on_trip: Optional[Callable[[CircuitOpenError], None]] = None,
    ):
        self.loop_threshold = loop_threshold
        self.budget_limit_usd = budget_limit_usd
        self.recovery_window_seconds = recovery_window_seconds
        self.on_trip = on_trip

        self._lock = threading.Lock()
        self._state = BreakerState.CLOSED
        self._signature_counts: dict[str, int] = defaultdict(int)
        self._total_cost_usd: float = 0.0
        self._trip_count: int = 0
        self._opened_at: Optional[float] = None
        self._last_trip_error: Optional[CircuitOpenError] = None

    @staticmethod
    def _make_signature(tool_name: str, args: Any) -> str:
        """Stable hash of (tool_name, args) — order-independent."""
        try:
            canonical = json.dumps(args, sort_keys=True, default=str)
        except (TypeError, ValueError):
            canonical = str(args)
        raw = f"{tool_name}:{canonical}"
        return hashlib.sha256(raw.encode()).hexdigest()[:16]

    def check(
        self,
        tool_name: str,
        args: Any = None,
        estimated_cost_usd: float = 0.0,
    ) -> None:
        """
        Call before every tool invocation.
        Raises CircuitOpenError if the breaker is open or should trip.
        """
        with self._lock:
            # OPEN state: check for recovery window
            if self._state == BreakerState.OPEN:
                elapsed = time.monotonic() - (self._opened_at or 0)
                if elapsed >= self.recovery_window_seconds:
                    self._state = BreakerState.HALF_OPEN
                else:
                    err = CircuitOpenError(
                        reason=self._last_trip_error.reason if self._last_trip_error else "open",
                        signature=self._last_trip_error.signature if self._last_trip_error else "",
                        trip_count=self._trip_count,
                    )
                    raise err

            sig = self._make_signature(tool_name, args)

            # Budget check (runs in both CLOSED and HALF_OPEN)
            if self._total_cost_usd + estimated_cost_usd > self.budget_limit_usd:
                err = CircuitOpenError(
                    reason="budget",
                    signature=sig,
                    trip_count=self._trip_count + 1,
                )
                self._trip(err)
                raise err

            # Loop detection
            self._signature_counts[sig] += 1
            count = self._signature_counts[sig]

            if self._state == BreakerState.HALF_OPEN:
                # In HALF_OPEN: any repeat signature re-opens the breaker
                if count > 1:
                    err = CircuitOpenError(
                        reason="loop",
                        signature=sig,
                        trip_count=self._trip_count + 1,
                    )
                    self._trip(err)
                    raise err
                else:
                    # Probe succeeded — reset to CLOSED
                    self._state = BreakerState.CLOSED
                    self._signature_counts.clear()
                    return

            if count >= self.loop_threshold:
                err = CircuitOpenError(
                    reason="loop",
                    signature=sig,
                    trip_count=self._trip_count + 1,
                )
                self._trip(err)
                raise err

            # Call is allowed — accumulate cost
            self._total_cost_usd += estimated_cost_usd

    def _trip(self, err: CircuitOpenError) -> None:
        self._state = BreakerState.OPEN
        self._opened_at = time.monotonic()
        self._trip_count = err.trip_count
        self._last_trip_error = err
        if self.on_trip:
            try:
                self.on_trip(err)
            except Exception:
                pass

    def reset(self) -> None:
        """Call after a clean run completes to clear all counters."""
        with self._lock:
            self._state = BreakerState.CLOSED
            self._signature_counts.clear()
            self._total_cost_usd = 0.0

    @property
    def state(self) -> BreakerState:
        with self._lock:
            return self._state

    @property
    def total_cost_usd(self) -> float:
        with self._lock:
            return self._total_cost_usd

A few implementation notes worth calling out:

LangChain integration

The cleanest integration point for AgentExecutor-based chains is a custom callback handler that intercepts tool calls before the tool function executes. Add it once; it covers every tool in the agent without modifying individual tool implementations.

from langchain.callbacks.base import BaseCallbackHandler
from langchain.agents import AgentExecutor


class CircuitBreakerCallback(BaseCallbackHandler):
    def __init__(self, breaker: AgentCircuitBreaker):
        self.breaker = breaker

    def on_tool_start(
        self,
        serialized: dict,
        input_str: str,
        **kwargs,
    ) -> None:
        tool_name = serialized.get("name", "unknown_tool")
        # Rough token cost estimate: 0.01 USD per 1K input chars (adjust per model)
        estimated_cost = len(input_str) / 1000 * 0.01
        self.breaker.check(
            tool_name=tool_name,
            args=input_str,
            estimated_cost_usd=estimated_cost,
        )

    def on_agent_finish(self, *args, **kwargs) -> None:
        self.breaker.reset()


# Usage
breaker = AgentCircuitBreaker(
    loop_threshold=3,
    budget_limit_usd=2.00,
    on_trip=lambda err: print(f"[BREAKER] {err.reason} — sig {err.signature[:8]}"),
)

executor = AgentExecutor(
    agent=your_agent,
    tools=your_tools,
    callbacks=[CircuitBreakerCallback(breaker)],
    max_iterations=50,  # keep as backstop, but breaker trips first
)

See LangChain circuit breaker integration for how to wire this into LangGraph conditional edges and how to handle CircuitOpenError inside a graph's exception node.

CrewAI integration

CrewAI's tool system routes through a BaseTool.run() method, making a decorator-based approach cleaner than a callback:

from functools import wraps
from crewai.tools import BaseTool


def guarded(breaker: AgentCircuitBreaker):
    """Decorator that wraps a CrewAI BaseTool with circuit-breaker protection."""
    def decorator(tool_class):
        original_run = tool_class._run

        @wraps(original_run)
        def _run(self, *args, **kwargs):
            breaker.check(
                tool_name=tool_class.__name__,
                args={"args": args, "kwargs": kwargs},
            )
            return original_run(self, *args, **kwargs)

        tool_class._run = _run
        return tool_class
    return decorator


# Apply to individual tools
breaker = AgentCircuitBreaker(loop_threshold=3, budget_limit_usd=3.00)

@guarded(breaker)
class WebSearchTool(BaseTool):
    name: str = "web_search"
    description: str = "Search the web for information"

    def _run(self, query: str) -> str:
        return search_api(query)

For Pydantic AI agents, the equivalent is a middleware wrapper around the agent's run() coroutine. See Pydantic AI loop prevention for that pattern, and CrewAI loop detection for more advanced CrewAI-specific scenarios including multi-agent crews where loop detection needs to span agent boundaries.

Budget enforcement in practice

The estimated_cost_usd parameter in check() lets you enforce a hard budget ceiling per run. The practical challenge is that accurate cost estimation requires knowing the model's pricing and the input + output token count before the call completes.

A reasonable approach is to estimate input tokens from the argument string length before the call, then record actual output tokens in a callback after the call completes. Here's a pattern using Anthropic's token counting:

import anthropic

client = anthropic.Anthropic()
COST_PER_1K_INPUT = 0.003   # Claude Sonnet 4.6 input pricing
COST_PER_1K_OUTPUT = 0.015  # Claude Sonnet 4.6 output pricing

def estimate_pre_call_cost(prompt: str) -> float:
    # Conservative: estimate input only, no output yet
    tokens = len(prompt.split()) * 1.3  # rough token count from word count
    return (tokens / 1000) * COST_PER_1K_INPUT

def record_post_call_cost(breaker: AgentCircuitBreaker, output: str) -> None:
    output_tokens = len(output.split()) * 1.3
    output_cost = (output_tokens / 1000) * COST_PER_1K_OUTPUT
    with breaker._lock:
        breaker._total_cost_usd += output_cost

For a full treatment of per-session budget enforcement with Redis-backed persistence across async invocations, see AI agent token budget enforcement in Python.

Using RunGuard (the turnkey path)

The implementation above covers the core state machine, but a production deployment also needs: persistent trip logs (for post-mortems), Slack or PagerDuty alerts on breaker open, per-app dashboards showing trip rates over 30 days, and cross-session budget accounting. RunGuard packages all of this as a single SDK.

Install it:

pip install runguard

The guard() function wraps any async or sync tool callable:

from runguard import guard, GuardConfig

config = GuardConfig(
    loop_threshold=3,
    budget_limit_usd=5.00,
    slack_webhook_url=os.getenv("SLACK_WEBHOOK_URL"),
    alert_at_usd=3.00,  # alert before hard limit
)

safe_search = guard(web_search, config=config)
safe_read   = guard(read_file, config=config)

# Use safe_search and safe_read exactly like the originals.
# The breaker is shared across both — a loop through either tool trips the circuit.

For async agents, wrap with async_guard():

from runguard import async_guard

safe_search = async_guard(async_web_search, config=config)

# Works with await as expected:
result = await safe_search("best practices for context pruning")

Trip events are logged to RunGuard's dashboard and can be queried via the API to power your own cost monitoring. The trip log persists across agent restarts, giving you a session-independent record of every breaker event — useful for identifying which agents are looping most frequently and what signatures they're repeating.

Frequently asked questions

How is a circuit breaker different from max_iterations?

max_iterations counts total steps. A circuit breaker counts repeated steps — identical tool calls with identical arguments. A 40-step research workflow with no repetition passes through the circuit breaker without tripping. A 3-step loop trips it at step 3. The circuit breaker fires earlier on real loops and later (or never) on legitimate long runs.

What threshold should I set for loop_threshold?

3 is the right starting point for most agents. Two identical calls can be coincidence (a legitimate re-query for a stale result). Three identical calls in a single session is almost always a loop. Set it lower (2) if your agent calls pure-read tools that should never repeat a query; higher (5) only if you have a documented use case for retrying the same query multiple times within a single run.

Does the circuit breaker handle context-window overflow as well as loops?

The signature-counting approach in this implementation catches tool-call loops directly. Context-window drift is a related but distinct failure mode — it requires token counting on the context passed to each LLM call, not the tool call arguments. RunGuard tracks both: loop_threshold covers signature-based detection, and context_limit_tokens (a separate config key) covers context-size monitoring. The two guards can trip independently or together.

How do I handle CircuitOpenError in a LangChain agent?

In AgentExecutor, raise it from inside the callback handler — LangChain will surface it as an unhandled exception that terminates the run. If you want the agent to gracefully wind down rather than hard-crash, catch CircuitOpenError in a custom on_tool_start handler, set a flag on the callback instance, and return an empty string from the tool — then check the flag in on_agent_action to inject an early-stop instruction into the model's next context. This gives the model a chance to summarize partial results before the run ends.

Will this work with AutoGen multi-agent conversations?

Yes, with one adjustment: in a multi-agent setup, you want a shared circuit breaker instance across all agents in the conversation, not one per agent. Create a single AgentCircuitBreaker instance and pass it to each agent's tool-call handler. This ensures that a loop driven by agent A calling agent B calling the same tool as agent A is caught by the shared signature counter, not treated as two separate single-call observations. See AutoGen loop guard circuit breaker for the full multi-agent wiring.

Don't want to maintain this yourself?

RunGuard ships the state machine above as a single-line install — plus persistent trip logs, Slack/PagerDuty alerts, a per-app dashboard, and cross-session budget accounting. The Solo plan is $19/mo for one guarded app.

See pricing 14-day free trial · no card required