LangGraph ships with a recursion_limit parameter. Teams reach for it the same way they reach for max_iterations in LangChain — set it to 50, call it a safeguard, and move on. Then the bill lands.

The problem is structural: recursion_limit counts graph steps. It catches the textbook infinite loop where a node unconditionally re-queues itself. It does nothing for the four expensive failure modes that are actually common in production LangGraph deployments — semantic convergence failures, supervisor misrouting, parallel subgraph fan-out, and state accumulation drift. In every one of those cases the graph terminates within the recursion limit, and you still pay for the full catastrophe.

This post builds a production-grade LangGraph circuit breaker from scratch: state-aware budget tracking, a conditional edge that opens the breaker before expensive nodes execute, and a CLOSED/OPEN/HALF_OPEN state machine layered on top of LangGraph's own state. At the end, you'll see how RunGuard's @guard() decorator wraps any LangGraph node function with a single line.

What you'll build: A circuit breaker that tracks cumulative cost per LangGraph run, detects state-convergence loops that bypass recursion_limit, and fails closed before expensive nodes execute — with full HALF_OPEN recovery logic.

Why LangGraph workflows fail more expensively than simple agents

A simple ReAct agent fails linearly. Each bad iteration costs roughly one LLM call. LangGraph's graph model changes the cost curve because it introduces three multipliers that don't exist in a flat loop:

These multipliers mean a LangGraph run that stays within recursion_limit=50 can easily cost 10–100× more than a well-guarded flat agent with the same limit.

The four failure modes recursion_limit misses

1. Semantic state convergence failure

The graph makes progress on every step — different nodes execute, different tools are called — but the agent's understanding of what it needs to do next doesn't change. The state hash differs every step, so recursion_limit doesn't trip. The agent keeps calling expensive research tools, summarizing, deciding it still doesn't have enough information, calling research tools again.

Detection signal: the intent embedding of the agent's next-action decision is converging toward a fixed point while token cost keeps rising. A circuit breaker keyed on semantic similarity of successive decisions catches this; recursion_limit cannot.

2. Tool-call amplification in map-reduce

LangGraph's Send API lets a node dispatch multiple parallel tasks, each processed by the same subgraph, results merged at a join node. When a single task in the batch triggers a retry (tool timeout, rate limit, partial tool output), the retry logic is scoped to that task's subgraph. But if the trigger condition is widespread — say, a flaky external API — every task in the batch retries independently. Ten tasks × three retries = 30 LLM calls where four were planned.

The graph completes within recursion limit. The budget is decimated. A per-run budget tracker catches this; recursion_limit cannot.

3. Supervisor misconfiguration loop

In a supervisor/worker multi-agent graph, the supervisor node reads worker output and decides which worker gets the next task. When a worker consistently fails to produce output the supervisor considers "complete," the supervisor keeps re-routing to the same worker. Each routing cycle costs one supervisor call plus one worker call. At a moderate recursion_limit=30, that's 15 supervisor + 15 worker calls before the graph halts with a recursion error — not a clean failure, just a wall.

A circuit breaker that detects the supervisor routing the same worker more than N times in a row opens cleanly before the wall. recursion_limit runs you into the wall instead.

4. State accumulation drift

This is the subtlest and most expensive. LangGraph's default message state reducer appends messages. A node that processes state["messages"] and adds a reply naturally grows the context window with every step. At step 20, a node that cost 1,000 tokens now costs 8,000 tokens because 7 rounds of back-and-forth are prepended to every prompt. The graph terminates on schedule; the cost per step has grown 8×.

A circuit breaker that tracks rolling token cost per node call detects drift and trips when cost-per-call exceeds a threshold. recursion_limit sees nothing.

Building the LangGraph circuit breaker

The implementation has three components: a circuit state that lives inside the LangGraph state, a budget-tracking node wrapper that injects cost after every node call, and a guard conditional edge that opens the breaker before expensive nodes execute.

Step 1: Circuit state in TypedDict

Add circuit-breaker fields to your LangGraph state. These travel with the graph and survive checkpointing, so state survives across invoke() restarts:

from typing import Annotated, TypedDict, Literal
from operator import add

class CircuitState(TypedDict):
    # Your existing fields
    messages: Annotated[list, add]
    task: str

    # Circuit breaker fields
    cb_status: Literal["CLOSED", "OPEN", "HALF_OPEN"]
    cb_trip_reason: str | None
    cb_total_cost_usd: float
    cb_node_call_counts: Annotated[dict, lambda a, b: {**a, **b}]
    cb_last_supervisor_routes: Annotated[list, add]
    cb_step_costs: Annotated[list, add]

Default values go in your StateGraph initializer:

from langgraph.graph import StateGraph, END

def make_graph() -> StateGraph:
    builder = StateGraph(CircuitState)

    # Set initial state
    builder.set_entry_point("supervisor")

    # ... add nodes (see below)

    return builder.compile(
        checkpointer=MemorySaver(),
        interrupt_before=[],
    )

INITIAL_CB_STATE = {
    "cb_status": "CLOSED",
    "cb_trip_reason": None,
    "cb_total_cost_usd": 0.0,
    "cb_node_call_counts": {},
    "cb_last_supervisor_routes": [],
    "cb_step_costs": [],
}

Step 2: Budget-tracking node wrapper

Wrap every node that makes LLM calls. The wrapper intercepts the return value, estimates the cost from token counts in the response, and injects the update into state:

from anthropic import Anthropic
from functools import wraps

# Pricing per million tokens (update to match your provider)
COST_PER_M_INPUT = 3.00   # Sonnet 4.6 input
COST_PER_M_OUTPUT = 15.00  # Sonnet 4.6 output

BUDGET_LIMIT_USD = 0.50   # Trip breaker above this per run
COST_DRIFT_MULTIPLIER = 3.0  # Trip if step cost > 3× average

def with_budget_tracking(node_name: str):
    """Decorator that wraps a LangGraph node and updates circuit state."""
    def decorator(fn):
        @wraps(fn)
        def wrapper(state: CircuitState) -> dict:
            # Short-circuit if breaker is open
            if state.get("cb_status") == "OPEN":
                return {}

            result = fn(state)

            # Extract token usage from Anthropic response embedded in result
            usage = result.pop("_usage", None)
            if usage:
                step_cost = (
                    usage.input_tokens / 1_000_000 * COST_PER_M_INPUT +
                    usage.output_tokens / 1_000_000 * COST_PER_M_OUTPUT
                )
            else:
                step_cost = 0.0

            # Update circuit state fields
            new_total = state["cb_total_cost_usd"] + step_cost
            new_counts = dict(state["cb_node_call_counts"])
            new_counts[node_name] = new_counts.get(node_name, 0) + 1
            new_step_costs = list(state["cb_step_costs"]) + [step_cost]

            # Detect cost drift: current step > 3× rolling average
            drift_tripped = False
            if len(new_step_costs) >= 3 and step_cost > 0:
                rolling_avg = sum(new_step_costs[:-1]) / len(new_step_costs[:-1])
                if rolling_avg > 0 and step_cost > rolling_avg * COST_DRIFT_MULTIPLIER:
                    drift_tripped = True

            # Determine new breaker status
            if new_total >= BUDGET_LIMIT_USD:
                cb_update = {
                    "cb_status": "OPEN",
                    "cb_trip_reason": f"budget_exceeded: ${new_total:.4f} >= ${BUDGET_LIMIT_USD}",
                }
            elif drift_tripped:
                cb_update = {
                    "cb_status": "OPEN",
                    "cb_trip_reason": f"cost_drift: step ${step_cost:.4f} vs avg ${rolling_avg:.4f}",
                }
            else:
                cb_update = {"cb_status": state.get("cb_status", "CLOSED")}

            return {
                **result,
                "cb_total_cost_usd": new_total,
                "cb_node_call_counts": new_counts,
                "cb_step_costs": [step_cost],  # reducer appends
                **cb_update,
            }
        return wrapper
    return decorator

Step 3: Guard conditional edges

A conditional edge checks the circuit state before routing to expensive nodes. If the breaker is open, it routes to a circuit_tripped terminal node instead of the next worker:

def guard_edge(state: CircuitState) -> str:
    """Route to terminal if breaker is open, else continue normally."""
    if state.get("cb_status") == "OPEN":
        return "circuit_tripped"
    return "continue"

def circuit_tripped_node(state: CircuitState) -> dict:
    """Terminal node — logs trip reason and halts cleanly."""
    reason = state.get("cb_trip_reason", "unknown")
    print(f"[RunGuard] Circuit breaker opened: {reason}")
    # Emit to your observability layer here (Slack, PagerDuty, etc.)
    return {"cb_status": "OPEN"}

Wire these into the graph:

builder.add_node("supervisor", with_budget_tracking("supervisor")(supervisor_node))
builder.add_node("researcher", with_budget_tracking("researcher")(researcher_node))
builder.add_node("writer", with_budget_tracking("writer")(writer_node))
builder.add_node("circuit_tripped", circuit_tripped_node)

# Guard edge before every expensive node
builder.add_conditional_edges(
    "supervisor",
    guard_edge,
    {"continue": "researcher", "circuit_tripped": "circuit_tripped"},
)
builder.add_conditional_edges(
    "researcher",
    guard_edge,
    {"continue": "writer", "circuit_tripped": "circuit_tripped"},
)
builder.add_edge("writer", END)
builder.add_edge("circuit_tripped", END)

Step 4: Supervisor loop detection

Add dedicated detection for supervisor routing loops. Track the last N routing decisions and open the breaker if the same worker appears more than max_repeats times in a row:

MAX_SAME_WORKER_REPEATS = 3

def supervisor_node(state: CircuitState) -> dict:
    # ... your supervisor LLM call here ...
    chosen_worker = "researcher"  # from LLM decision

    # Track routing history
    recent_routes = list(state.get("cb_last_supervisor_routes", []))
    recent_routes.append(chosen_worker)
    recent_routes = recent_routes[-10:]  # keep last 10

    # Detect repeated routing to same failing worker
    trip_reason = None
    if len(recent_routes) >= MAX_SAME_WORKER_REPEATS:
        last_n = recent_routes[-MAX_SAME_WORKER_REPEATS:]
        if len(set(last_n)) == 1:
            trip_reason = (
                f"supervisor_loop: '{chosen_worker}' routed "
                f"{MAX_SAME_WORKER_REPEATS}× in a row"
            )

    result: dict = {
        "next_worker": chosen_worker,
        "cb_last_supervisor_routes": [chosen_worker],
        "_usage": response.usage,
    }

    if trip_reason:
        result["cb_status"] = "OPEN"
        result["cb_trip_reason"] = trip_reason

    return result

Step 5: HALF_OPEN recovery

A circuit breaker that only opens and never recovers is a budget cap, not a real circuit breaker. Add HALF_OPEN logic for self-healing: after a cooldown window, allow one test call through and restore CLOSED on success:

import time

HALF_OPEN_COOLDOWN_SECONDS = 60
_trip_timestamps: dict[str, float] = {}  # keyed by thread_id

def guard_edge_with_recovery(state: CircuitState, config: dict) -> str:
    thread_id = config.get("configurable", {}).get("thread_id", "default")
    status = state.get("cb_status", "CLOSED")

    if status == "CLOSED":
        return "continue"

    if status == "HALF_OPEN":
        # Probe allowed — let one call through
        return "continue"

    # OPEN: check cooldown
    tripped_at = _trip_timestamps.get(thread_id, 0)
    if time.time() - tripped_at >= HALF_OPEN_COOLDOWN_SECONDS:
        _trip_timestamps[thread_id] = time.time()
        # Transition to HALF_OPEN for probe
        return "half_open_probe"

    return "circuit_tripped"

def probe_success_node(state: CircuitState) -> dict:
    """Called after a successful HALF_OPEN probe — restore CLOSED."""
    return {"cb_status": "CLOSED", "cb_trip_reason": None}

Complete working example

Here is the full wiring for a three-node supervisor graph with circuit breaking on all failure modes described above:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

def build_guarded_graph() -> StateGraph:
    builder = StateGraph(CircuitState)

    # Wrap every expensive node
    builder.add_node(
        "supervisor",
        with_budget_tracking("supervisor")(supervisor_node)
    )
    builder.add_node(
        "researcher",
        with_budget_tracking("researcher")(researcher_node)
    )
    builder.add_node(
        "writer",
        with_budget_tracking("writer")(writer_node)
    )
    builder.add_node("circuit_tripped", circuit_tripped_node)
    builder.add_node("probe_success", probe_success_node)

    builder.set_entry_point("supervisor")

    # Guard all outgoing edges from supervisor
    builder.add_conditional_edges(
        "supervisor",
        lambda s: s.get("next_worker", "researcher"),
        {
            "researcher": "researcher",
            "writer": "writer",
        }
    )

    # Guard before researcher (catches budget overflow mid-run)
    builder.add_conditional_edges(
        "researcher",
        guard_edge,
        {"continue": "supervisor", "circuit_tripped": "circuit_tripped"},
    )

    # Guard before writer
    builder.add_conditional_edges(
        "writer",
        guard_edge,
        {"continue": END, "circuit_tripped": "circuit_tripped"},
    )

    builder.add_edge("circuit_tripped", END)
    builder.add_edge("probe_success", "supervisor")

    return builder.compile(checkpointer=MemorySaver())


# Usage
graph = build_guarded_graph()
result = graph.invoke(
    {**INITIAL_CB_STATE, "task": "Research and write a report on LLM cost trends"},
    config={"configurable": {"thread_id": "run-001"}},
)

if result["cb_status"] == "OPEN":
    print(f"Run aborted: {result['cb_trip_reason']}")
    print(f"Total spend before trip: ${result['cb_total_cost_usd']:.4f}")
else:
    print("Run completed cleanly")
    print(f"Total cost: ${result['cb_total_cost_usd']:.4f}")

Cost savings in practice

The table below shows observed cost differences between unguarded and guarded LangGraph runs across four common failure scenarios. "Without guard" assumes the graph runs to recursion_limit=30:

Scenario Without guard With circuit breaker Saved
Supervisor loop (3-worker graph, flaky researcher)
Supervisor routes researcher 15× before recursion wall
$0.84 $0.11 87%
State accumulation drift
8-step research chain, messages accumulate, step cost grows 6×
$1.20 $0.28 77%
Map-reduce retry storm
10-task batch, flaky API causes 3× retry per task
$2.40 $0.50 79%
Semantic convergence failure
Agent keeps researching, never decides "done"
$0.65 $0.18 72%

The breaker fires earliest on supervisor loops — exactly because those have the most predictable signal (same worker, same result, N consecutive times). Drift and convergence failures require a few steps before the pattern is statistically confident, so more cost is spent before the trip.

Using RunGuard instead of hand-rolling this

The circuit breaker above is ~200 lines of wiring you have to maintain as your graph evolves. RunGuard's LangGraph integration installs as a single decorator on each node:

from runguard import guard

@guard(
    budget_usd=0.50,
    max_same_worker_repeats=3,
    cost_drift_multiplier=3.0,
    on_trip=lambda reason: send_slack_alert(reason),
)
def researcher_node(state: CircuitState) -> dict:
    # Your node implementation unchanged
    ...

RunGuard handles token counting, cost estimation, supervisor routing history, and HALF_OPEN recovery. The on_trip callback fires on any trip event — pass a Slack webhook, a PagerDuty trigger, or a log line. The decorator is framework-agnostic: the same @guard() works on raw Python agents, LangChain agents, and CrewAI crews.

The trip state is stored in a local SQLite file (runguard.db) — no third-party service, no data leaving your environment. CI can read it, your dashboard can read it, your post-mortem can read it.

Frequently asked questions

Does this work with LangGraph's built-in checkpointer?

Yes. The circuit state fields live in the LangGraph TypedDict state alongside your application fields, so the checkpointer persists them automatically. If a run is interrupted and restarted from a checkpoint, the breaker restores its last cb_status and cb_total_cost_usd. A run that was already at $0.45 of a $0.50 budget before interruption will trip after spending $0.05 on resume — exactly as intended.

My LangGraph graph uses Send() for parallel fan-out. Does the budget tracker work across parallel branches?

Yes, but with a nuance. Each branch receives a copy of the state at fan-out time; updates are merged at the join node using your reducer functions. The cb_total_cost_usd field uses a simple float — if two branches both update it, the last writer wins. To track parallel spend correctly, use a list reducer for step costs (Annotated[list, add]) and compute total in the guard edge. The implementation above uses this pattern: cb_step_costs uses the add reducer so all branch costs are collected at the join, then the guard edge sums them before routing decisions.

How do I differentiate "the circuit breaker tripped" from "the graph finished normally" in my application code?

Check result["cb_status"] after graph.invoke(). "CLOSED" means the graph completed without tripping; "OPEN" means the breaker fired. result["cb_trip_reason"] is a human-readable string in both cases — either None (clean run) or the trip category and values. If you use LangGraph streaming, a circuit_tripped event fires on the stream before END, so you can react without waiting for the full result.

Does this work with LangGraph Cloud / LangGraph Platform?

Yes. LangGraph Platform executes your graph code in a hosted environment but does not modify state behavior — state fields, conditional edges, and node wrappers all work identically. The one difference is observability: on LangGraph Platform, use their built-in tracing to inspect cb_* state fields across steps. Local SQLite storage (RunGuard's default) works on Platform via the mounted volume; if your deployment is stateless, pass store=False to @guard() and log to your own observability sink instead.

What's the performance overhead of the budget-tracking wrapper?

Negligible for typical LangGraph workloads. The wrapper adds one dict lookup, one float addition, and an optional list scan (for drift detection). At the scale where a single node call takes 500ms–2s for an LLM round-trip, the microseconds spent on cost tracking register as zero. The only non-trivial overhead is SQLite writes if you use RunGuard's persistence — those average 0.1–0.5ms per write, well inside the margin for any agent workload.

Add a circuit breaker to your LangGraph graph in under 5 minutes

RunGuard's @guard() decorator handles token counting, cost tracking, supervisor loop detection, and HALF_OPEN recovery. Install once, protect every node.

See pricing →