Agno — the rebranded successor to phidata — is a lean, production-first agent framework for Python. If you used phidata before the rename, the API is familiar: Agent, Team, Workflow, tool functions decorated with @tool or passed directly as Python callables. Agno's appeal is that it gets out of your way. There is no graph abstraction, no custom runtime, no YAML pipeline definition. An agent is Python, and you run it with agent.run("do the thing").

The simplicity is real. So is the cost exposure it leaves open. Agno ships with Agent(max_steps=N) and the ability to set structured_output_retries and per-tool retry counts. None of these guards talk to each other. A Team of agents can delegate tasks back and forth across the step boundary without any of the individual max_steps counters registering the other agent's work. A structured output that consistently fails Pydantic validation retries up to structured_output_retries times per step — multiplied by however many steps the agent takes before something else intervenes. The costs are real, the failure modes are documented in production postmortems, and max_steps=20 is not the answer to any of them.

This post covers four Agno-specific failure modes, each with a detection mechanism, and assembles them into a unified AgnoBreaker circuit breaker with CLOSED / OPEN / HALF_OPEN states. The breaker wraps Agno via two thin subclasses — GuardedAgent and GuardedTeam — that require no changes to your tool functions or the underlying framework.

Why max_steps is not a circuit breaker

Agno's max_steps parameter is a step counter. It counts how many times the agent has gone through the plan-act-observe loop. When the counter hits the limit, the agent raises AgentRunError with a message indicating the limit was reached. This is useful. It is not a circuit breaker.

A circuit breaker detects a pattern — specifically, a pattern that indicates the agent is no longer making progress but is continuing to spend money. The three-state CLOSED / OPEN / HALF_OPEN model adds recovery logic: after the breaker trips, it allows one probe request through to see if conditions have cleared, then resets if the probe succeeds. max_steps has no state machine, no pattern detection, and no recovery. It simply terminates the run at a fixed count regardless of whether the agent was making progress or had been in a tight loop since step 3.

More critically, max_steps is scoped to a single Agent instance. Agno's Team orchestrates multiple agents, and each agent runs its own step counter. A delegation cycle between two agents — Agent A asks Agent B, Agent B asks Agent A, repeat — burns two separate step counters simultaneously, potentially doubling the cost ceiling before either agent's max_steps fires. The Team itself has no step counter or delegation depth guard.

Failure mode 1: Team back-delegation cycle

Agno's Team lets member agents call each other via tool functions registered by the team orchestrator. In a typical setup, a leader agent routes tasks to specialist members. The failure mode is a cycle: the leader delegates to a specialist, the specialist cannot complete the task and re-delegates back to the leader for clarification, the leader re-delegates to the specialist with the same (or similar) instructions, and the cycle continues until both agents hit max_steps.

This happens most commonly in two configurations. First, a specialist agent whose tools are restricted but whose system prompt allows it to ask the leader for help — the specialist asks, the leader answers by re-delegating, the specialist is in the same position as before. Second, a team where two specialists have overlapping capabilities and the leader routes ambiguous tasks to whichever specialist responds first; if both specialists decline, the routing logic calls both again.

The Team has no built-in delegation depth counter. Each inter-agent call is transparent to the individual agents' step counters. To detect this, you need cross-agent context tracking using contextvars.ContextVar, which propagates through Python's async task boundaries and call stacks without requiring a shared global.

Failure mode 2: Structured output regeneration loop

Agno supports structured outputs via Agent(response_model=SomeModel), where SomeModel is a Pydantic BaseModel. When the LLM produces output that fails Pydantic validation, Agno retries the generation up to structured_output_retries times (default: 3) per invocation. This is per-invocation retry at the framework level, separate from the agent's step counter.

The failure mode occurs when the model consistently produces malformed structured output for a given input. Each retry is a full LLM round trip — same prompt, same input context, same (probably malformed) output — multiplied by structured_output_retries. If the agent runs for N steps and every step triggers the full retry budget, the actual LLM call count is N × (1 + structured_output_retries). With max_steps=20 and structured_output_retries=3, that is 80 LLM calls for what looks like a 20-step run in your logs.

Detection requires monitoring the fraction of LLM calls that result in validation failures versus successful parses. A healthy agent should see occasional validation failures early in a run as the model calibrates to the schema, but a near-100% failure rate across consecutive steps indicates the schema is genuinely incompatible with the model's output for this input class.

Failure mode 3: Tool retry multiplication storm

Agno tools can be configured with retry logic — retry_on_error=True or a custom retry_count on the tool definition. This is separate from agent-level retries. When a tool fails, Agno retries it at the tool level before returning the error to the agent. If the agent then decides to call the same tool again (because it received an error response and its retry logic says "try again"), you get multiplication: agent_retries × tool_retries calls for what appears to be a single tool invocation in step count terms.

The more subtle form of this failure mode requires no retry configuration at all. It occurs when the agent calls a tool that succeeds but returns output that does not satisfy the agent's expectations (a search that returns irrelevant results, a scrape that returns a 429 page, an API call that returns an empty list). The agent calls the same tool with the same or only slightly modified arguments, hoping for a different result. After N calls to the same tool with no meaningful variation in arguments, the agent is definitionally in a tool storm regardless of whether any individual call raised an exception.

Failure mode 4: Storage session bloat

Agno's AgentStorage — most commonly SqlAgentStorage backed by SQLite — persists agent sessions. When an agent is initialized with storage and a fixed session_id, it loads its prior conversation history from the database on each run() call. This means every LLM call in every run carries the full token weight of all prior sessions.

In practice: a team that runs the same agent on a recurring task with a persistent session_id accumulates history indefinitely. After 30 runs, each new run might open with 8,000–12,000 tokens of prior history before the task prompt is even injected. The agent step count looks normal — max_steps=10 is faithfully enforced — but the cost per step has tripled from what it was on day one because the context window baseline is 3× larger. max_steps does not account for token cost per step. A 10-step run at 12,000-token baseline costs the same as an 18-step run at 6,000-token baseline.

Detection requires checking the session's stored token count or serialized message history length before the run begins, and tripping the breaker if the accumulated context exceeds a threshold relative to your task's expected working context size.

Building AgnoBreaker

The breaker uses contextvars.ContextVar for delegation depth (thread-safe and async-safe), a dataclass for state, and straightforward counters for tool call and validation failure tracking. The CLOSED / OPEN / HALF_OPEN state machine provides recovery: after a configurable reset_timeout_seconds, the breaker allows one probe through; if the probe succeeds, it resets to CLOSED.

from __future__ import annotations

import hashlib
import json
import time
from collections import defaultdict, deque
from contextvars import ContextVar
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class BreakerState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class AgentBudgetExceededError(RuntimeError):
    pass


_delegation_depth: ContextVar[int] = ContextVar("_delegation_depth", default=0)


@dataclass
class AgnoBreaker:
    # Team back-delegation cycle
    max_delegation_depth: int = 4

    # Tool storm
    max_tool_repeats_session: int = 4      # exact (tool, args_hash) pair
    max_consecutive_same_tool: int = 3     # same tool name consecutively

    # Structured output regeneration
    max_validation_failure_rate: float = 0.8   # fraction of consecutive validation failures
    min_steps_for_rate_check: int = 4          # don't trip until N steps have occurred

    # Storage session bloat
    max_session_history_chars: int = 60_000    # trip before run if history exceeds this

    # HALF_OPEN recovery
    reset_timeout_seconds: float = 120.0

    # Internal state
    _state: BreakerState = field(default=BreakerState.CLOSED, init=False)
    _opened_at: float = field(default=0.0, init=False)

    # Tool call tracking
    _tool_call_counts: dict[str, int] = field(default_factory=lambda: defaultdict(int), init=False)
    _recent_tool_names: deque = field(default_factory=lambda: deque(maxlen=5), init=False)

    # Structured output tracking
    _step_outcomes: deque = field(default_factory=lambda: deque(maxlen=10), init=False)

    def gate(self) -> None:
        if self._state == BreakerState.OPEN:
            elapsed = time.monotonic() - self._opened_at
            if elapsed >= self.reset_timeout_seconds:
                self._state = BreakerState.HALF_OPEN
            else:
                raise AgentBudgetExceededError(
                    f"AgnoBreaker OPEN — tripped {elapsed:.0f}s ago, "
                    f"resets after {self.reset_timeout_seconds:.0f}s"
                )

    def record_successful_step(self) -> None:
        if self._state == BreakerState.HALF_OPEN:
            self.reset()

    def reset(self) -> None:
        self._state = BreakerState.CLOSED
        self._opened_at = 0.0
        self._tool_call_counts.clear()
        self._recent_tool_names.clear()
        self._step_outcomes.clear()

    def _trip(self, reason: str) -> None:
        self._state = BreakerState.OPEN
        self._opened_at = time.monotonic()
        raise AgentBudgetExceededError(reason)

    # ── Tool storm ──────────────────────────────────────────────────────────

    def record_tool_call(self, tool_name: str, args: Any) -> None:
        self.gate()

        args_hash = hashlib.md5(
            json.dumps(args, sort_keys=True, default=str).encode()
        ).hexdigest()[:8]
        key = f"{tool_name}:{args_hash}"

        self._tool_call_counts[key] += 1
        self._recent_tool_names.append(tool_name)

        if self._tool_call_counts[key] >= self.max_tool_repeats_session:
            self._trip(
                f"Tool repetition storm: {tool_name}({args_hash}) called "
                f"{self._tool_call_counts[key]} times with identical arguments"
            )

        recent = list(self._recent_tool_names)
        if len(recent) >= self.max_consecutive_same_tool:
            tail = recent[-self.max_consecutive_same_tool:]
            if len(set(tail)) == 1:
                self._trip(
                    f"Tool storm: {tool_name} called {self.max_consecutive_same_tool} "
                    f"times consecutively without interleaved steps"
                )

    # ── Structured output regeneration ──────────────────────────────────────

    def record_validation_result(self, success: bool) -> None:
        self.gate()
        self._step_outcomes.append(success)

        if len(self._step_outcomes) < self.min_steps_for_rate_check:
            return

        outcomes = list(self._step_outcomes)
        failure_rate = outcomes.count(False) / len(outcomes)
        if failure_rate >= self.max_validation_failure_rate:
            self._trip(
                f"Structured output regeneration loop: {failure_rate:.0%} of the last "
                f"{len(outcomes)} validation attempts failed — schema may be incompatible "
                f"with model output for this input class"
            )

    # ── Storage session bloat ───────────────────────────────────────────────

    def check_session_history(self, history_text: str) -> None:
        if len(history_text) > self.max_session_history_chars:
            self._trip(
                f"Storage session bloat: loaded session history is {len(history_text):,} chars "
                f"(limit {self.max_session_history_chars:,}). Reset the session_id or "
                f"summarize history before continuing."
            )

    # ── Team delegation depth ───────────────────────────────────────────────

    def check_delegation_depth(self) -> None:
        depth = _delegation_depth.get()
        if depth >= self.max_delegation_depth:
            self._trip(
                f"Team back-delegation cycle: delegation depth {depth} reached limit "
                f"{self.max_delegation_depth} — agents are delegating back and forth "
                f"without resolving the task"
            )

GuardedAgent: wrapping Agent.run()

Agno's Agent does not expose step-level hooks like a callback manager or a step event system. The cleanest interception point is to subclass Agent and override the run() method. Inside the override, you instrument tool calls by wrapping each tool function with a thin proxy that calls breaker.record_tool_call() before delegating to the real implementation. You restore the original functions after the run completes so tools are not permanently modified.

from agno.agent import Agent, RunResponse
from agno.models.base import Model
from typing import Callable


class GuardedAgent(Agent):
    """Drop-in Agent subclass that applies AgnoBreaker before and during each run."""

    def __init__(self, *args, breaker: AgnoBreaker | None = None, **kwargs):
        super().__init__(*args, **kwargs)
        self._breaker = breaker or AgnoBreaker()

    def run(self, message: str, **kwargs) -> RunResponse:
        breaker = self._breaker

        # Storage session bloat: check history before the run begins
        if self.storage is not None:
            try:
                session = self.storage.read(self.session_id)
                if session and session.memory:
                    history_text = json.dumps(session.memory, default=str)
                    breaker.check_session_history(history_text)
            except Exception:
                pass  # storage read failure is not a breaker condition

        # Team delegation depth: increment before entering this agent
        depth_token = _delegation_depth.set(_delegation_depth.get() + 1)
        breaker.check_delegation_depth()

        # Wrap tool functions to record calls
        original_tools: dict[str, Callable] = {}
        if self.tools:
            for tool in self.tools:
                if callable(tool) and hasattr(tool, "__name__"):
                    name = tool.__name__
                    original = tool

                    def make_guarded(original_fn: Callable, tool_name: str) -> Callable:
                        def guarded(*a, **kw):
                            breaker.record_tool_call(tool_name, {"args": a, "kwargs": kw})
                            return original_fn(*a, **kw)
                        guarded.__name__ = original_fn.__name__
                        guarded.__doc__ = original_fn.__doc__
                        return guarded

                    original_tools[name] = original
                    # Replace in-place on the tools list
                    idx = self.tools.index(tool)
                    self.tools[idx] = make_guarded(original, name)

        try:
            breaker.gate()
            response = super().run(message, **kwargs)
            breaker.record_successful_step()
            return response
        finally:
            # Restore original tool functions
            if self.tools:
                for i, tool in enumerate(self.tools):
                    if callable(tool) and hasattr(tool, "__name__"):
                        name = tool.__name__
                        if name in original_tools:
                            self.tools[i] = original_tools[name]
            _delegation_depth.reset(depth_token)

The structured output validation failure hook requires one additional override. Agno calls its internal validation logic inside the agent's response parsing phase. Rather than monkey-patching Agno internals, the practical approach is to inspect the response after the fact: if response.content is a string (indicating a parse failure that Agno surfaced as raw text rather than a structured object), record a validation failure; if it is an instance of your response_model type, record a success.

    def run(self, message: str, **kwargs) -> RunResponse:
        # ... (storage + delegation + tool wrapping setup as above) ...

        try:
            breaker.gate()
            response = super().run(message, **kwargs)

            # Check structured output validation result
            if self.response_model is not None:
                parsed_ok = isinstance(response.content, self.response_model)
                breaker.record_validation_result(parsed_ok)

            breaker.record_successful_step()
            return response
        finally:
            # ... (cleanup as above) ...

GuardedTeam: wrapping inter-agent delegation

Agno's Team orchestrates agents by routing the leader's delegate instructions to the appropriate member. The delegation is transparent to each member's individual step counter. GuardedTeam overrides the team's internal member-dispatch method to increment the ContextVar delegation depth counter before each member call and decrement it after.

from agno.team import Team


class GuardedTeam(Team):
    """Team subclass that tracks cross-agent delegation depth for cycle detection."""

    def __init__(self, *args, breaker: AgnoBreaker | None = None, **kwargs):
        super().__init__(*args, **kwargs)
        self._breaker = breaker or AgnoBreaker()
        # Share the same breaker with all member agents that support it
        for member in self.members:
            if isinstance(member, GuardedAgent):
                member._breaker = self._breaker

    def run_member(self, member: Agent, message: str, **kwargs):
        breaker = self._breaker
        depth_token = _delegation_depth.set(_delegation_depth.get() + 1)
        try:
            breaker.check_delegation_depth()
            return super().run_member(member, message, **kwargs)
        finally:
            _delegation_depth.reset(depth_token)

Because all GuardedAgent members and the GuardedTeam share the same AgnoBreaker instance, tool storms and validation failures accumulate across all agents in the team — not per-agent. A tool that Agent A calls twice and Agent B calls twice registers four total hits against the breaker's session counter. This is the correct behavior: from the user's billing perspective, the team is one unit of cost, not two separate agents.

Complete wiring example

A research team with a leader, a web search specialist, and a summarizer specialist. The breaker is shared across all three agents and the team. The leader has a fallback instruction that could cause it to re-delegate if neither specialist returns satisfactory results — the scenario that historically creates delegation cycles.

from agno.agent import Agent
from agno.team import Team
from agno.models.openai import OpenAIChat
from agno.tools import DuckDuckGo, Newspaper4k

model = OpenAIChat(id="gpt-4o")
shared_breaker = AgnoBreaker(
    max_delegation_depth=3,
    max_tool_repeats_session=4,
    max_consecutive_same_tool=3,
    max_validation_failure_rate=0.75,
    min_steps_for_rate_check=4,
    max_session_history_chars=50_000,
    reset_timeout_seconds=90.0,
)

web_agent = GuardedAgent(
    model=model,
    tools=[DuckDuckGo()],
    name="web_researcher",
    description="Searches the web for current information.",
    max_steps=8,
    breaker=shared_breaker,
)

summarizer = GuardedAgent(
    model=model,
    tools=[Newspaper4k()],
    name="summarizer",
    description="Reads and summarizes web pages.",
    max_steps=8,
    breaker=shared_breaker,
)

team = GuardedTeam(
    name="research_team",
    agents=[web_agent, summarizer],
    model=model,
    instructions=[
        "Route search tasks to web_researcher.",
        "Route summarization tasks to summarizer.",
        "If a specialist cannot complete the task, synthesize from available results "
        "rather than re-delegating.",  # prevent re-delegation cycle
    ],
    breaker=shared_breaker,
)

shared_breaker.reset()
try:
    response = team.run("Research the current state of AI agent frameworks in 2026")
    print(response.content)
except AgentBudgetExceededError as e:
    print(f"[BREAKER TRIPPED] {e}")
    # notify Slack, save partial results, etc.
except Exception as e:
    print(f"[AGENT ERROR] {e}")

Failure mode traces and what the breaker catches

Team back-delegation cycle trip — leader delegates to web_researcher (depth 2), web_researcher cannot find results and delegates back to leader for clarification (depth 3), leader re-delegates to web_researcher (depth 4). GuardedTeam.run_member() detects depth 4 ≥ max_delegation_depth=3. Raises AgentBudgetExceededError: "Team back-delegation cycle: delegation depth 4 reached limit 3 — agents are delegating back and forth without resolving the task". Saved: the full tail of the cycle, which would have consumed both agents' remaining step budgets — up to 14 additional LLM calls at the cycle point.

Structured output regeneration loop trip — an agent with response_model=ResearchReport runs 6 steps. Steps 3 through 6 all produce raw string output instead of the ResearchReport schema. Failure rate over the rolling 10-step window: 4/6 = 67%, approaching the 75% threshold. At step 7, a fifth failure pushes the rate to 5/7 = 71%. At step 8, rate hits 6/8 = 75% exactly — check_validation_result(False) trips. Raises AgentBudgetExceededError: "Structured output regeneration loop: 75% of the last 8 validation attempts failed". Without the breaker, max_steps=20 would allow 12 more steps × 4 retries each = 48 additional LLM calls at rising token cost.

Tool retry multiplication storm trip — summarizer calls Newspaper4k(url="https://example.com/article") four times with the same URL (the article keeps returning a 403, but the agent retries because the error message is non-specific). The exact key newspaper4k_read:a1b2c3d4 hits count 4 on the fourth call. Raises AgentBudgetExceededError: "Tool repetition storm: newspaper4k_read(a1b2c3d4) called 4 times with identical arguments". Saved: the continuation of a retry strategy that has already failed three times.

Storage session bloat pre-check trip — the web_researcher agent is initialized with session_id="daily-research" and runs daily. After 45 sessions, its stored history is 67,000 characters. On session 46, the pre-run history check in GuardedAgent.run() reads the session from SqlAgentStorage, serializes it, and finds 67,000 chars exceeding the 50,000 limit. Raises AgentBudgetExceededError: "Storage session bloat: loaded session history is 67,000 chars (limit 50,000)" before any LLM call is made. Every subsequent run at this session size would have cost 3× the baseline; the pre-check catches it before the first token is spent.

Detection threshold guidance

Failure mode Conservative threshold Aggressive threshold When to tighten
Team back-delegation
max_delegation_depth
5 depth levels 3 depth levels Single leader + one or two specialists with no expected back-delegation in the system prompt
Tool repeat (session)
max_tool_repeats_session
5 exact repeats 3 exact repeats Tools are deterministic and idempotent (re-calling adds no information)
Tool storm (consecutive)
max_consecutive_same_tool
4 consecutive 3 consecutive Tasks don't require sequential multi-call tool chains (e.g. no pagination)
Structured output failure rate
max_validation_failure_rate
0.8 (80%) 0.6 (60%) Response schema is well-tested on the target model; >60% failure rate is definitively broken
Storage session bloat
max_session_history_chars
80,000 chars 40,000 chars Tasks have predictable, short working contexts (no need for deep history recall)

Frequently asked questions

Agno's Agent doesn't expose a step callback hook like LangGraph or smolagents do. Won't subclassing Agent.run() miss intra-step tool calls?

Yes, partially — the GuardedAgent override intercepts calls at the run level, not at each individual tool invocation inside a step. The tool wrapping approach described above addresses this: by replacing the tool function objects in self.tools with thin proxies before super().run() is called, every invocation of that tool — whether the agent calls it once or five times in a single step — gets recorded by the breaker. The limitation is that tool wrapping requires the tools to be Python callables in agent.tools; tools implemented as Agno toolkit objects with a run() method need a different interception point (subclass the toolkit and override run()). For toolkit-style tools, call breaker.record_tool_call() inside the subclass's run() override directly, then delegate to super().run().

We use Agno's Workflow for multi-step pipelines, not Agent or Team. Does AgnoBreaker apply?

Workflow is Agno's sequential pipeline abstraction — it runs a series of steps in order, and each step can invoke an agent. The breaker applies at the agent level, so if each step in your workflow uses a GuardedAgent, the per-agent tool storm and validation failure checks work as described. The delegation depth check does not apply to Workflow steps (delegation depth is only meaningful when agents call other agents, not when a workflow orchestrates sequential agent invocations). The storage session bloat pre-check applies whenever an agent in the workflow uses AgentStorage. Add a workflow-level wrapper that resets the breaker between steps if each step should be treated as an independent cost unit.

The delegation depth ContextVar propagates through async tasks. Will it behave correctly if we run multiple team.run() calls concurrently with asyncio.gather()?

contextvars.ContextVar is exactly the right primitive for this: each asyncio task gets a copy of the context at creation time, so concurrent tasks started with asyncio.gather() each have an independent delegation depth counter. There is no cross-task contamination. The one subtlety: if you create the tasks inside an already-deep delegation context (e.g., a callback inside a member agent that spawns sub-tasks), the spawned tasks will inherit the parent's depth counter at creation time. This is correct behavior — the tasks are genuinely executing at that delegation depth. If you intentionally want sub-tasks to start from depth 0 (fully isolated cost accounting), copy the context explicitly before creating each task: asyncio.ensure_future(coro, context=contextvars.copy_context()) and set depth to 0 in the new context.

How does the storage session bloat check interact with agents that legitimately need long conversation history — research assistants that accumulate findings over many sessions?

The max_session_history_chars threshold should be set relative to your model's context window and your task's expected working context, not as an absolute "sessions are bad" limit. A research assistant that accumulates 50,000 chars of prior findings over 20 sessions and then legitimately uses that history on session 21 is working as intended — raise the threshold to 100,000 chars or disable the storage check entirely for that agent by passing max_session_history_chars=0 (disable). The breaker's value here is catching agents where the session accumulation is accidental — a recurring daily task that was initialized with a persistent session_id when it should have used session_id=None (a new session each run). The fix for accidental accumulation is usually to use fresh session IDs per run and summarize to a separate long-term store when recall is needed. Add a quarterly session cleanup task that summarizes all sessions older than 30 days into a condensed knowledge file.

Agno was recently renamed from phidata. Are there API differences that affect how the breaker should be wired?

The rename from phidata to Agno was a branding change, not a full API rewrite. The core classes — Agent, Team, Workflow, AgentStorage — maintained backward-compatible interfaces through the transition, and import paths changed from phi.* to agno.*. The GuardedAgent and GuardedTeam subclasses described here use Agno's public API (Agent.run(), Team.run_member(), agent.tools, agent.storage, agent.session_id, agent.response_model) — all of which were present in phidata under the same names. If you are on an older phidata version, replace from agno.agent import Agent with from phi.agent import Agent and the breaker implementation is otherwise identical. The AgentStorage read API (storage.read(session_id)) also carries over unchanged.

Skip the hand-rolling. RunGuard does this in one line.

The implementation above works, but it's ~130 lines of infrastructure that every team building on Agno writes from scratch. RunGuard wraps it in a single install call — adds the delegation guard, the tool storm detector, the Slack alert on trip, and a dashboard showing your last 30 days of incidents by agent, failure mode, and cost saved.

See pricing View RunGuard