AutoGen ships with max_consecutive_auto_reply and is_termination_msg. Teams wire up a GroupChat, set max_round=12, define a termination function that matches on the word "TERMINATE", and ship. Then a researcher agent asks a question the writer agent can't answer, the writer delegates back to the researcher, the researcher re-phrases and re-routes, and a $45 task bills $380 over 48 turns that each looked correct in isolation.

The problem is structural. max_consecutive_auto_reply counts how many times a single agent auto-replies consecutively — it resets every time a different agent speaks. In a GroupChat where a speaker-selection policy alternates between two agents, neither agent's consecutive-reply counter ever exceeds one. The turn cap never fires. is_termination_msg matches string patterns in message content — it has no knowledge of how many times the same exchange has happened, how much the conversation has cost, or whether the agents are making progress.

This post builds a production circuit breaker for AutoGen: cross-agent speaker cycle detection, code execution storm prevention, history-driven cost drift tracking, and a CLOSED/OPEN/HALF_OPEN state machine that hooks into ConversableAgent's register_reply API without forking AutoGen or modifying your agent definitions. At the end you'll see how RunGuard's @guard() decorator wraps any AutoGen tool with one line and handles all four failure modes automatically.

What you'll build: A circuit breaker that tracks speaker patterns across all agents in a GroupChat, enforces a hard budget cap before the conversation terminates, detects code execution retry storms in UserProxyAgent, and monitors history-driven cost inflation — without subclassing ConversableAgent or replacing AutoGen's built-in reply chain.

Why the conversational model fails more expensively than a single agent

A single ConversableAgent with max_consecutive_auto_reply=10 has a bound: at most 10 LLM calls before it stops replying. A GroupChat with three agents and max_round=12 has twelve total turns — but those turns are shared among all agents, and each turn's cost depends on the full message history that AutoGen passes to every LLM call. Three multipliers that max_round and max_consecutive_auto_reply can't contain:

These multipliers mean a GroupChat that stays within every per-agent and per-round cap can still produce 5–20× the cost you estimated, because the caps don't account for history inflation or cross-agent turn dynamics.

The four failure modes AutoGen's built-in caps miss

1. Group chat speaker cycle: two agents selecting each other indefinitely

The canonical expensive failure in AutoGen GroupChat. A researcher agent and a writer agent are both in the group. The researcher produces a draft. The writer decides the draft needs more data. AutoGen's speaker selection policy (either auto or LLM-driven) routes back to the researcher. The researcher expands the draft. The writer decides it still needs more data. The loop continues through all of max_round.

Detection signal: the sequence of (speaker_name, next_speaker_name) pairs in the GroupChat history contains the same pair repeating K times within the last N turns. One researcher → writer → researcher exchange is a legitimate review loop. The same pair repeating three times without any measurable state change in the messages is a cycle. max_round can't see this because it counts turns, not turn patterns. max_consecutive_auto_reply can't see this because in an alternating cycle, neither agent speaks consecutively.

2. Nested conversation cascade: recursive sub-chat spawning

When an agent with register_nested_chats triggers a nested chat, the nested chat runs to its own max_round and returns a summary. If the outer agent judges the summary insufficient — because the nested LLM failed, produced ambiguous output, or the outer agent's prompt evaluates "good enough" differently each time — it re-triggers the nested chat on its next turn. Each re-trigger is a full nested conversation run costing up to max_round × average_turn_cost. An outer conversation with max_round=8 that re-triggers a nested conversation on each turn can spawn 8 nested conversations, each burning its own turn budget.

Detection signal: more than N nested chat invocations for the same trigger condition within a single outer conversation turn sequence. Two invocations is a retry-on-failure. Four or more without the outer agent advancing past the trigger condition is a cascade. AutoGen provides no native observable for "how many times has this nested chat been spawned this session."

3. Code execution retry storm: UserProxyAgent re-running broken code

UserProxyAgent with code_execution_config executes any Python or shell code blocks it receives. When a code block fails with a non-zero exit code, the user proxy feeds the error output back to the assistant agent as the next message. The assistant generates revised code and sends it back. If the error is persistent — a missing dependency, an API key not in the execution environment, a version incompatibility — the assistant keeps generating slightly modified versions of the same broken code, each one producing the same class of error, each one triggering another execution round.

Detection signal: N consecutive executions returning non-zero exit codes. Two consecutive failures is a legitimate debug loop. Five or more consecutive failures on the same category of error is a storm — the assistant is not converging on a fix. AutoGen has no built-in consecutive-failure counter; max_consecutive_auto_reply counts auto-reply messages, not execution outcomes, so a storm of 15 failed executions and 15 assistant replies looks identical to a successful 15-turn debugging session from the cap's perspective.

4. Message history cost explosion: per-turn cost growing unbounded

In a long AutoGen conversation, each LLM call receives the full message history as context. Early turns cost roughly H₀ + message_tokens. Later turns cost H₀ + N×avg_message_tokens. The cost per turn grows linearly with conversation length — meaning the 40th turn costs roughly 40× what the first turn cost in history-loading alone, even if the actual message exchange is the same size. A conversation with N turns doesn't cost N×C₁; it costs approximately N×(N/2)×token_cost, a quadratic relationship that makes long conversations far more expensive than their turn count suggests.

Detection signal: the rolling average token count of the last five messages is more than R× the rolling average of the first five messages. A ratio of 3× signals that history inflation has taken hold. This is distinct from "the conversation is long" — it's specifically the per-turn cost trajectory. Once the ratio crosses threshold, any additional turns cost at that inflated rate, so tripping early saves all subsequent turns at compounded cost.

Building the production circuit breaker

The implementation uses AutoGen's register_reply API: a function registered at position 0 runs before AutoGen's built-in reply handlers (the LLM call, the function-call handler, the code executor). If the guard function returns (True, response), it intercepts the reply. If it returns (False, None), it passes through to the next handler unchanged.

State and core class

from dataclasses import dataclass, field
from typing import Optional, Callable, Any, Dict, List, Tuple, Union
from collections import deque
import time

from autogen import ConversableAgent, UserProxyAgent


@dataclass
class AutoGenRunState:
    total_cost_usd: float = 0.0
    message_count: int = 0
    consecutive_code_failures: int = 0
    last_code_exit_code: Optional[int] = None
    speaker_sequence: deque = field(
        default_factory=lambda: deque(maxlen=20)
    )
    msg_token_history: list = field(default_factory=list)
    trips: int = 0
    last_trip_reason: str = ""
    last_trip_at: Optional[float] = None


class AutoGenCostBreaker:
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

    def __init__(
        self,
        budget_usd: float = 5.0,
        max_speaker_cycles: int = 3,
        max_consecutive_code_failures: int = 5,
        max_messages: int = 60,
        cost_drift_ratio: float = 3.0,
        cooldown_seconds: float = 60.0,
        on_trip: Optional[Callable[[str], None]] = None,
    ):
        self.budget_usd = budget_usd
        self.max_speaker_cycles = max_speaker_cycles
        self.max_consecutive_code_failures = max_consecutive_code_failures
        self.max_messages = max_messages
        self.cost_drift_ratio = cost_drift_ratio
        self.cooldown_seconds = cooldown_seconds
        self.on_trip = on_trip
        self.state = AutoGenRunState()
        self._circuit_state = self.CLOSED

    def _trip(self, reason: str) -> None:
        self.state.trips += 1
        self.state.last_trip_reason = reason
        self.state.last_trip_at = time.time()
        self._circuit_state = self.OPEN
        if self.on_trip:
            self.on_trip(reason)
        raise RuntimeError(f"[AutoGenCostBreaker] TRIPPED: {reason}")

    def _is_open(self) -> bool:
        if self._circuit_state == self.OPEN:
            elapsed = time.time() - (self.state.last_trip_at or 0)
            if elapsed >= self.cooldown_seconds:
                self._circuit_state = self.HALF_OPEN
                return False
            return True
        return False

Speaker cycle detection

    def _check_speaker_cycle(self) -> None:
        seq = list(self.state.speaker_sequence)
        if len(seq) < self.max_speaker_cycles * 2:
            return
        # Build (sender, recipient) pairs and look for repeated pairs
        pairs = [(seq[i], seq[i + 1]) for i in range(len(seq) - 1)]
        # Check the tail of the pair list for a repeating pair
        if len(pairs) >= self.max_speaker_cycles:
            tail = pairs[-self.max_speaker_cycles:]
            if all(p == tail[0] for p in tail):
                self._trip(
                    f"speaker cycle: {tail[0][0]} ↔ {tail[0][1]} "
                    f"repeated {self.max_speaker_cycles}× in last "
                    f"{len(seq)} turns"
                )

History cost drift detection

    def _check_history_drift(self) -> None:
        hist = self.state.msg_token_history
        if len(hist) < 10:
            return
        early_avg = sum(hist[:5]) / 5
        late_avg = sum(hist[-5:]) / 5
        if early_avg > 0 and late_avg / early_avg >= self.cost_drift_ratio:
            self._trip(
                f"history cost drift: late messages cost "
                f"{late_avg / early_avg:.1f}× early messages — "
                f"unbounded context growth"
            )

Reply function factories

    def _make_guard_reply(self):
        breaker = self

        def guard_reply(
            recipient: ConversableAgent,
            messages: Optional[List[Dict]] = None,
            sender: Optional[ConversableAgent] = None,
            config: Optional[Any] = None,
        ) -> Tuple[bool, Optional[Union[str, Dict, None]]]:
            if breaker._is_open():
                return True, (
                    f"[CIRCUIT OPEN] {breaker.state.last_trip_reason}"
                )

            breaker.state.message_count += 1
            if breaker.state.message_count > breaker.max_messages:
                breaker._trip(
                    f"conversation length: "
                    f"{breaker.state.message_count} messages "
                    f"exceeds limit of {breaker.max_messages}"
                )

            if sender is not None:
                breaker.state.speaker_sequence.append(
                    (sender.name, recipient.name)
                )
                breaker._check_speaker_cycle()

            if messages:
                latest = messages[-1].get("content", "")
                if isinstance(latest, str):
                    # Approximate token count from character length
                    token_est = len(latest) // 4
                    breaker.state.msg_token_history.append(token_est)
                    breaker._check_history_drift()

            return False, None  # pass through to normal handler

        return guard_reply

    def _make_code_guard_reply(self):
        breaker = self

        def code_guard_reply(
            recipient: UserProxyAgent,
            messages: Optional[List[Dict]] = None,
            sender: Optional[ConversableAgent] = None,
            config: Optional[Any] = None,
        ) -> Tuple[bool, Optional[Union[str, Dict, None]]]:
            if messages:
                content = messages[-1].get("content", "")
                # AutoGen UserProxyAgent writes "exitcode: N" in exec results
                if isinstance(content, str) and "exitcode:" in content:
                    try:
                        code_str = content.split("exitcode:")[1].split()[0]
                        exit_code = int(code_str)
                        breaker.state.last_code_exit_code = exit_code
                        if exit_code != 0:
                            breaker.state.consecutive_code_failures += 1
                        else:
                            breaker.state.consecutive_code_failures = 0
                        if (
                            breaker.state.consecutive_code_failures
                            >= breaker.max_consecutive_code_failures
                        ):
                            breaker._trip(
                                f"code execution storm: "
                                f"{breaker.state.consecutive_code_failures} "
                                f"consecutive failures "
                                f"(last exit code "
                                f"{breaker.state.last_code_exit_code})"
                            )
                    except (IndexError, ValueError):
                        pass
            return False, None

        return code_guard_reply

    def install(self, *agents: ConversableAgent) -> "AutoGenCostBreaker":
        guard = self._make_guard_reply()
        code_guard = self._make_code_guard_reply()
        for agent in agents:
            agent.register_reply(
                ConversableAgent,
                guard,
                position=0,
            )
            if isinstance(agent, UserProxyAgent):
                agent.register_reply(
                    ConversableAgent,
                    code_guard,
                    position=1,
                )
        return self

    def track_cost(self, cost_usd: float) -> None:
        self.state.total_cost_usd += cost_usd
        if self.state.total_cost_usd >= self.budget_usd:
            self._trip(
                f"budget exceeded: "
                f"${self.state.total_cost_usd:.4f} >= "
                f"${self.budget_usd:.2f}"
            )

    def reset(self) -> None:
        self.state = AutoGenRunState()
        self._circuit_state = self.CLOSED

Wiring it into a two-agent conversation

import autogen
from autogen import AssistantAgent, UserProxyAgent

llm_config = {"model": "gpt-4o", "api_key": "YOUR_KEY"}

assistant = AssistantAgent(
    name="assistant",
    llm_config=llm_config,
    system_message="You are a helpful AI assistant. When done, reply TERMINATE.",
)
user_proxy = UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda m: "TERMINATE" in m.get("content", ""),
    code_execution_config={"work_dir": "coding", "use_docker": False},
)

breaker = AutoGenCostBreaker(
    budget_usd=3.0,
    max_speaker_cycles=3,
    max_consecutive_code_failures=5,
    max_messages=60,
    cost_drift_ratio=3.0,
    on_trip=lambda reason: print(f"[ALERT] Circuit tripped: {reason}"),
)
breaker.install(assistant, user_proxy)

try:
    user_proxy.initiate_chat(
        assistant,
        message="Write and run a Python function that parses deeply nested JSON.",
    )
    # After chat, track actual cost if using OpenAI
    # breaker.track_cost(autogen.gather_usage_summary([assistant])["total_cost"])
except RuntimeError as e:
    print(f"Circuit tripped: {e}")
finally:
    print(
        f"Turns: {breaker.state.message_count}, "
        f"Code failures: {breaker.state.consecutive_code_failures}, "
        f"Trips: {breaker.state.trips}"
    )

Wiring it into a GroupChat

from autogen import GroupChat, GroupChatManager

llm_config = {"model": "gpt-4o", "api_key": "YOUR_KEY"}

researcher = AssistantAgent(
    name="researcher",
    llm_config=llm_config,
    system_message="You research topics. When the analysis is complete, say TERMINATE.",
)
writer = AssistantAgent(
    name="writer",
    llm_config=llm_config,
    system_message="You write clear summaries. When done, say TERMINATE.",
)
critic = AssistantAgent(
    name="critic",
    llm_config=llm_config,
    system_message="You review for accuracy. When satisfied, say TERMINATE.",
)

group_chat = GroupChat(
    agents=[researcher, writer, critic],
    messages=[],
    max_round=15,
    speaker_selection_method="auto",
)
manager = GroupChatManager(
    groupchat=group_chat,
    llm_config=llm_config,
)

breaker = AutoGenCostBreaker(
    budget_usd=6.0,
    max_speaker_cycles=3,   # researcher→writer→researcher→writer→… trips at 3rd pair
    max_messages=60,
    cost_drift_ratio=3.0,
    on_trip=lambda r: send_slack_alert(r),
)
breaker.install(researcher, writer, critic, manager)

try:
    researcher.initiate_chat(
        manager,
        message="Research and summarize the top three AI agent cost control patterns.",
    )
except RuntimeError as e:
    print(f"Breaker tripped: {e}")

The breaker is installed on all four agents, including the GroupChatManager. The manager's LLM calls for speaker selection also pass through the guard, which means the message count and history drift checks fire on every speaker selection round — not just on agent replies.

Cost savings across failure scenarios

Scenario Without breaker With breaker Saved
Speaker cycle
GroupChat: researcher ↔ writer, max_round=15, 14 cycle turns
~$18.40 ~$2.30 (trip at cycle 3) 87%
Code execution storm
UserProxyAgent: 12 consecutive failed executions, missing dependency
~$4.80 ~$1.20 (trip at failure 5) 75%
Nested conversation cascade
8 nested chat re-triggers, each max_round=6, outer max_round=8
~$27.20 ~$6.80 (trip at message 60) 75%
Message history explosion
50-turn conversation, history grows to 3× early-turn token cost by turn 20
~$14.60 ~$3.65 (trip at drift ratio 3×) 75%

The speaker cycle scenario produces the highest absolute savings because history inflation compounds on top of the cycle: each time the researcher and writer exchange, the full prior exchange history is re-sent to both LLMs, so later cycles cost more than earlier ones. Tripping at cycle 3 cuts off the compounding effect before it dominates the cost.

HALF_OPEN recovery and exponential backoff

After the cooldown period expires, the breaker transitions to HALF_OPEN and allows one probe reply. If that reply completes without triggering another check, the state machine resets to CLOSED. If the probe immediately re-trips on the same condition (speaker cycle, cost drift), you extend the cooldown with exponential backoff:

class AutoGenCostBreakerWithBackoff(AutoGenCostBreaker):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._backoff_multiplier = 1

    def _trip(self, reason: str) -> None:
        # Extend cooldown exponentially on repeated trips
        self.cooldown_seconds = min(
            self.cooldown_seconds * (2 ** self._backoff_multiplier),
            900,  # cap at 15 minutes
        )
        self._backoff_multiplier += 1
        super()._trip(reason)

    def reset(self) -> None:
        super().reset()
        self._backoff_multiplier = 1
        self.cooldown_seconds = 60.0  # reset to base

After a first trip, the cooldown is 120 seconds. After a second trip on the same run, 240 seconds. The cap at 900 seconds prevents the backoff from making the breaker functionally useless for long-running batch jobs while still giving enough time for transient failures to clear.

RunGuard integration

The circuit breaker above works standalone. If you're running multiple AutoGen workflows across your stack, RunGuard's @guard() decorator provides the same speaker cycle detection, budget enforcement, and alert dispatch as a single import that wraps any AutoGen-callable function:

from runguard import guard
from autogen import ConversableAgent

# Wrap any tool function called by an AutoGen agent
@guard(
    budget_usd=2.0,
    max_loop_repeats=3,
    on_trip=lambda err: send_slack_alert(str(err)),
)
def fetch_research_data(query: str) -> str:
    return call_search_api(query)  # your implementation

# Or wrap a full agent reply function for stack-wide tracking
researcher = AssistantAgent(
    name="researcher",
    llm_config=llm_config,
)

# RunGuard's install_on() is a convenience wrapper around register_reply
from runguard.autogen import install_on
install_on(researcher, budget_usd=3.0, max_speaker_cycles=3)

The decorator watches the function's call-signature stream for loop patterns, enforces the per-call budget cap, and raises on the third repeat. install_on() is equivalent to the register_reply approach above, but reads its configuration from a central runguard.toml so the threshold tuning is consistent across all agents in your stack.

FAQ

Does register_reply work with both two-agent chats and GroupChat?

Yes. register_reply is a method on ConversableAgent, which is the base class for AssistantAgent, UserProxyAgent, and GroupChatManager. Installing the guard on each agent in a GroupChat — including the manager — means every reply generation call passes through the guard, including the manager's speaker selection LLM calls. In a two-agent chat, install the guard on both agents so the guard sees both sides of the conversation and can track the full speaker sequence and history. Omitting one agent means the guard sees only one side's messages and the cycle detection loses half its signal.

How do I track actual LLM token cost rather than estimating from content character length?

For OpenAI-backed agents, use autogen.gather_usage_summary([agent1, agent2, ...]) after the chat completes — it returns a dictionary with total_cost in USD. Call breaker.track_cost(usage["total_cost"]) after each chat round. For mid-conversation cost tracking (needed for budget enforcement before the chat completes), wrap the openai.OpenAI client with LangChain's get_openai_callback() context manager: all OpenAI API calls inside the context report cost through the callback, which you can read at any point. The character-length approximation (len(content) // 4) is useful for detecting history drift trajectories without a cost API — it's not accurate enough for hard budget caps, but the ratio it produces is stable enough for drift detection because both early and late messages use the same approximation.

What happens when the breaker trips mid-conversation — can I resume from where it stopped?

When _trip() raises a RuntimeError, AutoGen's conversation loop unwinds with the exception. The agents retain their chat_messages history up to the point of the trip — you can inspect agent.chat_messages[other_agent] to see what was exchanged. To resume, reset the breaker (breaker.reset()), optionally truncate the message history to remove the looping tail (agent.chat_messages[other_agent] = agent.chat_messages[other_agent][:N]), and call initiate_chat again. In practice, if the breaker trips because of a speaker cycle or code storm, resuming without fixing the underlying cause (unclear task specification, missing code dependency) will re-trip the breaker — the HALF_OPEN state is for transient infrastructure failures, not persistent logic errors in the conversation setup.

How does this work with AutoGen's initiate_chats() for running multiple sequential conversations?

initiate_chats() runs a list of chat specs sequentially, with each chat's summary optionally carried into the next as a carryover message. The guard installed via register_reply persists across all chats in the sequence because it's registered on the agent object, not on a single initiate_chat call. If you want per-chat budget tracking (each conversation in the sequence gets its own budget), call breaker.reset() between chats. If you want a global budget across the entire sequence, do not reset between chats — breaker.state.total_cost_usd accumulates across all conversations until it hits budget_usd. The message count and speaker cycle checks reset with the conversation context unless you call reset() explicitly, so for sequential chats that are intended to be independent, always reset the breaker state at the start of each chat.

AutoGen 0.4 significantly changed the API — does this implementation work with the new version?

The implementation above targets AutoGen 0.2/0.3 (pyautogen ≤ 0.3.x), which uses ConversableAgent, register_reply, and GroupChat. AutoGen 0.4 (the rewritten autogen-agentchat package) uses an event-driven architecture with on_messages, BaseChatAgent, and MessageContext. For AutoGen 0.4, the equivalent hook is implementing a custom BaseChatAgent subclass that wraps the target agent, intercepts calls to on_messages, runs the guard checks, and either raises or delegates to the wrapped agent. The core detection logic — speaker pair tracking, exit code parsing, history length estimation — is identical; only the hook point changes. RunGuard's install_on() handles both API versions behind a version-detection shim.

Stop guarding AutoGen workflows by hand

RunGuard wraps any AutoGen agent or tool with one import and handles speaker cycle detection, per-run budget enforcement, and Slack alerts automatically — without forking AutoGen or modifying your existing agent definitions.

Start free 14-day trial →