Haystack 2.x ships with max_agent_steps on the Agent component. Teams set it and move on, believing their cost exposure is bounded. Then a document-retrieval agent calls a web_search tool that returns a 404, decides to retry with a reformulated query, gets another 404, and calls the tool again with the original query — cycling between slight phrasings of the same broken request until max_agent_steps fires. Or an iterative refinement pipeline — LLM → Validator → Rewriter → LLM — finds a query the validator's scoring function consistently rejects as "insufficient quality," rewrites it eight times in a loop, and exhausts the max_runs_per_component cap set globally across every component in the pipeline. Both patterns run to completion while billing for the full iteration count. Neither is caught earlier because the safety mechanism counts steps, not progress.
Haystack's pipeline model introduces loop risk that pure agent frameworks don't have: explicit back-edges. When you call pipeline.connect("validator.passed", "llm.messages") alongside pipeline.connect("validator.failed", "rewriter.messages"), you've defined a feedback loop in graph form. Haystack supports this deliberately — it's the right pattern for iterative refinement, self-correction, and retry-with-reflection. The problem is that the framework's only guard against unbounded iteration is max_runs_per_component, a per-component integer that caps how many times any single component can execute in a single pipeline run. It's a blunt instrument: too low and legitimate retries get truncated, too high and a non-converging loop runs 15 or 20 cycles before stopping. It doesn't know whether the loop made progress between iterations — only that it happened N times.
This post builds a production circuit breaker for Haystack 2.x: pipeline back-edge convergence detection, tool repetition storm prevention, chat history token inflation monitoring, and cross-pipeline delegation depth tracking — all implemented as a lightweight Component wrapper that slots into any Haystack pipeline without modifying your existing components or connections. At the end you'll see how RunGuard's one-call install replaces this hand-rolled breaker with a managed version that sends Slack alerts when the breaker trips and provides a 30-day incidents dashboard.
What you'll build: A circuit breaker that detects when a pipeline's iterative loop fails to converge (same validator rejection signal more than N cycles), catches the same tool called with the same arguments in the agent's function-calling loop, monitors per-step prompt token growth for super-linear cost escalation, and tracks nested pipeline.run() depth across orchestrator-to-specialist delegation — all without touching your existing pipeline definition.
Why Haystack's step cap fails differently than other frameworks
Most agent frameworks apply safety caps at the agent loop level — a single counter that increments with each reasoning step. Haystack has two separate mechanisms that interact in non-obvious ways: max_agent_steps on the Agent component and max_runs_per_component on the pipeline itself. Understanding how they interact explains why production teams keep hitting runaway costs even after setting both.
max_agent_steps lives inside the Agent component and caps the agent's internal tool-calling loop — the cycle of: generate tool call → ToolInvoker executes → result fed back to generator → decide next action. This counter is accurate for the agent's own reasoning steps. What it doesn't see is anything happening in the pipeline around the agent: any back-edges defined outside the Agent component, any wrapper components that call the agent multiple times, or any cross-pipeline calls where the agent dispatches to another pipeline as a "tool."
max_runs_per_component is a pipeline-level global that caps how many times each component in the pipeline can execute within a single pipeline.run() call. It applies identically to every component — the LLM, the validator, the rewriter, the output collector. This creates a hidden cost amplifier: if you set max_runs_per_component=10, and your pipeline has an LLM, a validator, and a rewriter in a feedback loop, the pipeline can run 10 LLM calls, 10 validator calls, and 10 rewriter calls in a single run() — 30 component executions. Each LLM call accumulates context from prior iterations. By iteration 10, your prompt might be 5× the size it was at iteration 1 because the rewriter keeps appending its reformulation rationale to the message history.
The real failure mode is a non-converging refinement loop where the validator's rejection criterion is never satisfied. The pipeline runs to max_runs_per_component, calling the LLM at quadratically increasing prompt costs on each iteration, and produces no useful output — it just stops at the cap with a final partial result. You don't learn that the loop failed to converge until you read the output and see the last reformulation is as poor as the first. max_runs_per_component told you nothing about whether progress was made.
The four failure modes Haystack's built-in controls miss
1. Non-converging iterative pipeline loops
Haystack's back-edge pipeline pattern — validator rejects → rewriter improves → LLM regenerates → validator scores again — is a legitimate production architecture for tasks where output quality is measurable. Document summarization with a coherence scorer, code generation with a syntax checker, query expansion with a relevance filter. The pattern works when the rewriter makes real progress on each iteration: the validator's score improves and eventually crosses the acceptance threshold.
The failure mode occurs when the rewriter doesn't have the information to meaningfully improve the output, or when the validator's criterion is poorly calibrated and no output can satisfy it. In these cases every iteration produces a different reformulation of the same quality, the validator rejects each one, and the loop runs until max_runs_per_component fires. The cost of this failure is exactly the cost of running the full pipeline max_runs_per_component times — often 10–15 expensive LLM calls on a prompt that grows with each accumulated rewrite.
Detection signal: the validator's rejection output (its "failed" branch) fires more than N consecutive times without the "passed" branch firing. You don't need to inspect the content of the rejection — just track the binary signal. Three consecutive rejections with no improvement in the validator's numeric score (if it emits one) is a non-convergence signal. Five consecutive rejections regardless of score is a hard trip threshold: if the loop hasn't converged in five attempts, adding more attempts costs money without changing the outcome. Route to the "passed" branch with the best-scoring output seen so far rather than looping forever.
2. Tool repetition storm in the agent's function-calling loop
Haystack's Agent component coordinates tool calling via ToolInvoker. The agent generates a tool call, the ToolInvoker executes the corresponding Tool or ComponentTool, the result is appended to the ChatMessage history as a tool response, and the generator decides the next action. When a tool fails, raises an exception, or returns a result the agent judges as insufficient, the agent typically tries the same tool again — sometimes with a slightly different argument, sometimes with identical arguments.
Haystack's ToolInvoker is intentionally forgiving: it catches tool exceptions, formats them as tool response messages with the error text, and lets the agent decide whether to retry. This is correct behavior for transient failures. The failure mode is a tool that consistently fails on a specific input class — a web search tool that returns empty results for a domain the agent keeps querying, a database lookup tool that returns zero rows for an entity the agent insists exists, a file reader tool that can't find a path the agent keeps generating. The agent sees the error, reasons about it, produces a slightly different tool call, hits the same error, reasons again — repeating up to max_agent_steps times while accumulating both the failed tool calls and the agent's reasoning about each failure in the growing ChatMessage history.
Detection signal: the same (tool_name, args_fingerprint) pair appearing in consecutive tool calls within a single agent session. A fingerprint is an MD5 of the normalized argument dict. Two consecutive identical calls could be an intentional retry. Four identical calls in a session is a storm. This is measurable by wrapping the ToolInvoker's run() method or by subclassing Tool to track invocation history.
3. Chat history token inflation across agent steps
Haystack's Agent maintains a messages list — a List[ChatMessage] that accumulates the full conversation history including tool calls, tool responses, and agent reasoning. Every reasoning step prepends this history to the next LLM call as context. A session that starts with a 800-token prompt can reach 6,000 tokens per step by step 10 if each step adds substantial tool output to the history: web search results, database rows, file contents, stack traces from failed tool calls.
The non-obvious failure mode is a combination of large tool outputs and failed retries. When a tool returns a long error message — a full Python traceback, a verbose API error response with request headers — and the agent retries the same call multiple times, each retry appends another copy of the same long error message to the history. By step 8, the history might contain 6 copies of a 400-token stack trace, adding 2,400 tokens to every subsequent prompt for no informational value. The agent's reasoning quality degrades as the context fills with repeated errors, and the per-step cost escalates beyond anything a step-count cap can predict.
Detection signal: the ratio of (current step's estimated input token count) to (median input token count across the first three steps) exceeds a threshold. A 3× growth ratio means something is inflating the history abnormally — either large tool outputs or repeated failures. This doesn't require intercepting the actual LLM API call: you can estimate input token count from the length of the serialized messages list before each step using a simple character-based proxy (len(json.dumps([m.to_dict() for m in messages])) / 4 is a reasonable approximation for non-CJK text).
4. Cross-pipeline delegation depth: orchestrators dispatching to sub-pipelines
Haystack's ComponentTool class lets you wrap any Haystack component — including another Pipeline — as a callable tool available to an Agent. This is the natural way to build multi-agent orchestration in Haystack: the orchestrator is an Agent with tools that are ComponentTool wrappers around specialist pipelines. Each specialist pipeline can itself contain an Agent with its own tools.
The delegation loop failure mode: Specialist A receives a query it can't answer confidently. It has a tool that routes uncertain queries back to the orchestrator for clarification. The orchestrator receives the clarification request, decides the specialist is the right handler for the underlying task, and dispatches again. Neither agent's max_agent_steps counter sees more than one step per round-trip — the delegation is a single tool call from each agent's perspective. The loop runs until something external stops it: a process timeout, an OOM error as the combined message histories of both agents fill memory, or an operator kill.
Detection signal: nested pipeline.run() call depth exceeding a threshold. Track with a contextvars.ContextVar that increments before each pipeline.run() call and decrements after. A depth of 2 (orchestrator dispatches to specialist) is expected. A depth of 4 or more in a system designed with one orchestrator and one specialist layer is a cycle. This is cross-cutting state that no individual pipeline or agent can observe on its own — it requires a process-level shared variable.
Building the circuit breaker
Haystack 2.x's Component protocol is the right integration layer. A component is any class decorated with @component that exposes typed input and output sockets. You can wrap any existing component by creating a new component whose run() method instantiates the inner component, calls its run(), and applies circuit breaker logic before and after. The wrapper slots into the pipeline in place of the original component — no changes to pipeline connections or other component definitions.
For agent-level protection, the cleanest pattern is to subclass Agent and override run() to apply breaker checks between steps. For pipeline-level back-edge loop detection, wrap the pipeline's execution in a guard function that tracks the validator output history. For delegation depth, use a module-level ContextVar.
from __future__ import annotations
import contextvars
import hashlib
import json
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
from haystack import component, Pipeline
from haystack.components.agents import Agent
from haystack.components.invokers import ToolInvoker
from haystack.dataclasses import ChatMessage
class BreakerState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
# Module-level context variable — tracks nested pipeline.run() depth.
_PIPELINE_DEPTH: contextvars.ContextVar[int] = contextvars.ContextVar(
"runguard_pipeline_depth", default=0
)
@dataclass
class HaystackBreaker:
"""
Circuit breaker state and detection logic for Haystack agents and pipelines.
Attach to an AgentGuard component or call check_*() methods directly from
a pipeline wrapper function.
"""
budget_usd: float = 5.0
max_tool_repeats: int = 4
max_delegation_depth: int = 4
cost_drift_ratio: float = 3.0
max_consecutive_validator_rejections: int = 5
cooldown_seconds: float = 60.0
cost_per_1k_input_tokens: float = 0.002
cost_per_1k_output_tokens: float = 0.006
state: BreakerState = field(default=BreakerState.CLOSED, init=False)
total_cost_usd: float = field(default=0.0, init=False)
trips: int = field(default=0, init=False)
_opened_at: Optional[float] = field(default=None, init=False)
_trip_reason: str = field(default="", init=False)
# Tracking state
_tool_history: deque = field(default_factory=lambda: deque(maxlen=20), init=False)
_step_token_estimates: List[int] = field(default_factory=list, init=False)
_consecutive_rejections: int = field(default=0, init=False)
def gate(self) -> None:
"""Raise if the breaker is OPEN and cooldown has not expired."""
if self.state == BreakerState.OPEN:
elapsed = time.monotonic() - (self._opened_at or 0.0)
if elapsed < self.cooldown_seconds:
raise RuntimeError(
f"[RunGuard] circuit OPEN — {self._trip_reason} "
f"(cooldown: {self.cooldown_seconds - elapsed:.0f}s remaining)"
)
self.state = BreakerState.HALF_OPEN
def _trip(self, reason: str) -> None:
self.state = BreakerState.OPEN
self._opened_at = time.monotonic()
self._trip_reason = reason
self.trips += 1
raise RuntimeError(f"[RunGuard] circuit tripped — {reason}")
def record_tool_call(self, tool_name: str, tool_args: Dict[str, Any]) -> None:
"""Call before each ToolInvoker execution."""
self.gate()
fingerprint = hashlib.md5(
json.dumps({"n": tool_name, "a": tool_args}, sort_keys=True).encode()
).hexdigest()[:12]
self._tool_history.append(fingerprint)
recent = list(self._tool_history)[-self.max_tool_repeats:]
if len(recent) == self.max_tool_repeats and len(set(recent)) == 1:
self._trip(
f"tool repetition storm — '{tool_name}' called {self.max_tool_repeats}× "
f"with identical args"
)
def record_messages_snapshot(self, messages: List[ChatMessage]) -> None:
"""Call at the start of each agent step with the current message list."""
self.gate()
char_count = sum(
len(m.content or "") + len(json.dumps(m.tool_call.arguments) if m.tool_call else "")
for m in messages
)
estimated_tokens = char_count // 4
self._step_token_estimates.append(estimated_tokens)
if len(self._step_token_estimates) >= 6:
baseline = sorted(self._step_token_estimates[:3])[1] # median of first 3
current = self._step_token_estimates[-1]
if baseline > 0 and current / baseline > self.cost_drift_ratio:
self._trip(
f"chat history token inflation — current step ~{current} tokens, "
f"baseline ~{baseline} tokens ({current/baseline:.1f}× drift)"
)
def record_cost(self, input_tokens: int, output_tokens: int) -> None:
"""Accumulate cost and trip if budget exceeded."""
step_cost = (
input_tokens / 1000 * self.cost_per_1k_input_tokens
+ output_tokens / 1000 * self.cost_per_1k_output_tokens
)
self.total_cost_usd += step_cost
if self.total_cost_usd > self.budget_usd:
self._trip(
f"budget exceeded — ${self.total_cost_usd:.4f} > "
f"${self.budget_usd:.2f} limit"
)
def record_validator_result(self, passed: bool) -> None:
"""Call after each validator step in a back-edge pipeline."""
if passed:
self._consecutive_rejections = 0
else:
self._consecutive_rejections += 1
if self._consecutive_rejections >= self.max_consecutive_validator_rejections:
self._trip(
f"non-converging pipeline loop — validator rejected "
f"{self._consecutive_rejections} consecutive outputs"
)
def check_delegation_depth(self) -> None:
"""Call before any nested pipeline.run() dispatch."""
depth = _PIPELINE_DEPTH.get()
if depth >= self.max_delegation_depth:
self._trip(
f"cross-pipeline delegation depth {depth} exceeded limit "
f"{self.max_delegation_depth} — likely delegation cycle"
)
With the breaker dataclass in place, apply it at two levels: the Agent wrapper for step-level checks and a pipeline-level decorator for back-edge loop detection.
Wrapping the Agent component
Subclass Agent to intercept each step. Haystack's Agent.run() in 2.x iterates over a step loop internally — you can override the method and call super().run() after applying per-step checks, or you can wrap the ToolInvoker that the agent uses:
class GuardedAgent(Agent):
"""
Drop-in Agent replacement that applies a HaystackBreaker on each step.
Pass breaker= at construction; all other kwargs pass through to Agent.
"""
def __init__(self, *args: Any, breaker: Optional[HaystackBreaker] = None, **kwargs: Any):
super().__init__(*args, **kwargs)
self._breaker = breaker or HaystackBreaker()
def run(
self,
messages: List[ChatMessage],
streaming_callback: Any = None,
) -> Dict[str, Any]:
self._breaker.gate()
self._breaker.record_messages_snapshot(messages)
# Patch ToolInvoker to record tool calls before execution
original_invoker_run = self.tool_invoker.run
def guarded_invoker_run(messages: List[ChatMessage], **kwargs: Any) -> Any:
# Find the most recent tool call in messages
for msg in reversed(messages):
if msg.tool_call:
self._breaker.record_tool_call(
msg.tool_call.tool_name,
msg.tool_call.arguments or {},
)
break
return original_invoker_run(messages, **kwargs)
self.tool_invoker.run = guarded_invoker_run # type: ignore[method-assign]
try:
result = super().run(messages=messages, streaming_callback=streaming_callback)
finally:
self.tool_invoker.run = original_invoker_run # type: ignore[method-assign]
return result
Wrapping pipeline.run() for depth tracking and back-edge detection
For delegation depth and pipeline-level back-edge loops, wrap pipeline.run() in a context manager that manages the depth counter and, if the pipeline includes a validation branch, intercepts the validator's output socket to feed record_validator_result():
from contextlib import contextmanager
from typing import Generator
@contextmanager
def guarded_pipeline_run(
pipeline: Pipeline,
breaker: HaystackBreaker,
) -> Generator[None, None, None]:
"""
Context manager that tracks delegation depth and validates the breaker
before executing pipeline.run().
Usage:
with guarded_pipeline_run(my_pipeline, breaker):
result = my_pipeline.run(inputs)
"""
breaker.check_delegation_depth()
depth_token = _PIPELINE_DEPTH.set(_PIPELINE_DEPTH.get() + 1)
try:
yield
finally:
_PIPELINE_DEPTH.reset(depth_token)
def make_validator_hook(breaker: HaystackBreaker, passed_key: str = "passed"):
"""
Returns a post-run hook for the validator component that records its result.
pipeline.add_component("validator", MyValidator())
# After running: call breaker.record_validator_result(output["validator"]["passed"])
"""
def hook(outputs: Dict[str, Any]) -> None:
result = outputs.get(passed_key, False)
breaker.record_validator_result(bool(result))
return hook
Wiring it together
Here's a complete iterative refinement pipeline that uses both guards — the GuardedAgent for step-level protection and guarded_pipeline_run for back-edge convergence detection:
from haystack import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.routers import ConditionalRouter
from haystack.tools import Tool
# 1. Define breaker with explicit budget and convergence limits
breaker = HaystackBreaker(
budget_usd=3.0,
max_tool_repeats=3,
max_consecutive_validator_rejections=4,
cost_drift_ratio=2.5,
max_delegation_depth=3,
)
# 2. Build the agent with the guarded subclass
search_tool = Tool(
name="web_search",
description="Search the web for information",
function=my_web_search_fn,
)
agent = GuardedAgent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
tools=[search_tool],
max_agent_steps=8,
breaker=breaker,
)
# 3. Build an iterative refinement pipeline around the agent
pipeline = Pipeline(max_runs_per_component=6)
pipeline.add_component("agent", agent)
pipeline.add_component("validator", QualityValidator(threshold=0.75))
pipeline.add_component("rewriter", QueryRewriter())
pipeline.add_component("router", ConditionalRouter(
routes=[
{"condition": "{{ passes }}", "output": "{{ agent_output }}", "output_type": str, "output_name": "final"},
{"condition": "{{ not passes }}", "output": "{{ rewritten }}", "output_type": list, "output_name": "retry"},
]
))
pipeline.connect("agent.messages", "validator.messages")
pipeline.connect("validator.score", "router.passes") # score >= threshold → passes = True
pipeline.connect("validator.messages", "router.agent_output")
pipeline.connect("router.retry", "rewriter.messages")
pipeline.connect("rewriter.messages", "agent.messages") # back-edge
# 4. Run with depth guard and validator hook
validator_hook = make_validator_hook(breaker, passed_key="passes")
try:
with guarded_pipeline_run(pipeline, breaker):
result = pipeline.run({"agent": {"messages": initial_messages}})
# After each pipeline run, feed the validator result to the breaker
# In practice, patch validator or post-process pipeline outputs
passed = result.get("router", {}).get("final") is not None
validator_hook({"passed": passed})
except RuntimeError as exc:
if "[RunGuard]" in str(exc):
print(f"Breaker tripped: {exc}")
# Return best partial result or raise to caller
else:
raise
For the validator hook to fire on each loop iteration (not just the final output), wrap the validator's run() method similarly to the ToolInvoker wrapping above. The pattern is the same: save the original run, replace with a wrapper that calls the hook and restores the original afterward.
Circuit breaker state machine
The breaker follows the standard three-state model: CLOSED (normal operation, counting costs and patterns), OPEN (tripped, all operations blocked, cooldown timer running), and HALF_OPEN (cooldown expired, next operation is a probe — if it succeeds, transitions back to CLOSED; if it trips again, resets cooldown).
| State | Behavior | Transition |
|---|---|---|
CLOSEDnormal |
All operations pass. Breaker accumulates cost, tool call history, and token estimates. | → OPEN when any threshold is exceeded |
OPENtripped |
All gate() calls raise immediately with the trip reason. No LLM calls reach the generator. |
→ HALF_OPEN after cooldown_seconds |
HALF_OPENprobing |
One operation is allowed through. If it completes without tripping, state → CLOSED. If it trips, state → OPEN with reset cooldown. | → CLOSED on success; → OPEN on trip |
For Haystack pipelines specifically, HALF_OPEN is most useful in batch processing scenarios where the same pipeline runs repeatedly on different inputs. A transient failure (a flaky API that caused a trip) should clear after cooldown so subsequent documents in the batch are processed normally. Without HALF_OPEN, a single trip would block the entire batch for cooldown_seconds even if the root cause cleared within seconds.
Detection thresholds for Haystack in practice
The right thresholds depend on your pipeline's topology. Some starting points from production Haystack deployments:
- Iterative refinement pipelines (validator → rewriter → LLM back-edge):
max_consecutive_validator_rejections=4. Legitimate convergence typically happens within 2–3 iterations for well-calibrated quality criteria. Beyond 4 consecutive rejections, the loop is structurally non-converging. - Research / retrieval agents (web search, database lookups):
max_tool_repeats=3,budget_usd=1.0–3.0. Retrieval failures are common on specific inputs; 3 repeats gives the agent a chance to rephrase. Budget cap catches the case where the agent keeps searching across many tool types without finding a result. - Document processing agents (file reading, extraction, transformation):
cost_drift_ratio=2.5,max_agent_steps=6. Document agents often handle inputs of varying size — a drift ratio catches the case where a single large document inflates the context disproportionately. - Multi-pipeline orchestration (ComponentTool wrapping specialist pipelines):
max_delegation_depth=3. In a two-tier system (orchestrator + specialists), depth 3 means the orchestrator dispatched to a specialist which dispatched to a sub-specialist — the maximum expected depth. Depth 4 always indicates a cycle.
Using RunGuard instead of hand-rolling this
The implementation above handles the four core failure modes but leaves several operational concerns unaddressed: Where do breaker trip events go so the on-call team knows within 5 minutes? How do you track trip rate over time to know if a specific pipeline is structurally flawed versus occasionally flaky? How do you tune thresholds based on real cost data rather than guesswork?
RunGuard wraps this breaker logic in a managed SDK that sends Slack or PagerDuty alerts when a breaker trips, exposes a 30-day incidents dashboard with cost-saved-per-trip metrics, and persists per-pipeline threshold calibration based on your actual invocation history. The install for a Haystack 2.x pipeline is:
import runguard
# Replace the GuardedAgent + guarded_pipeline_run setup above with:
rg = runguard.install(
app="my-haystack-pipeline",
budget_usd=3.0,
framework="haystack",
)
# Then use rg.agent() in place of Agent() and rg.run(pipeline, inputs)
# in place of pipeline.run(inputs).
agent = rg.agent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
tools=[search_tool],
max_agent_steps=8,
)
result = rg.run(pipeline, {"agent": {"messages": initial_messages}})
rg.run() handles depth tracking, back-edge convergence monitoring, and cost accumulation automatically. rg.agent() returns a GuardedAgent-equivalent with the shared breaker instance pre-wired. The RunGuard Haystack integration docs cover multi-pipeline orchestration setups and how to attach the validator hook in pipelines with custom routing logic.
Frequently asked questions
Does this work with Haystack 1.x or only 2.x?
This implementation targets Haystack 2.x, which introduced the Component protocol and the new Pipeline execution model. Haystack 1.x used a different node/edge graph abstraction and the Agent class didn't exist as a first-class component. If you're on 1.x, the tool-call wrapping pattern still applies via the BaseComponent.run() override, but the max_runs_per_component back-edge protection is not needed — 1.x pipelines don't support explicit cycles the same way.
Will wrapping ToolInvoker.run() break if Haystack updates its internal implementation?
Any method-level monkey-patching is fragile against internal implementation changes. The safer long-term approach is to subclass ToolInvoker directly and override its run() method, then pass the subclass instance to Agent(tool_invoker=GuardedToolInvoker(breaker=breaker)). Haystack's Agent accepts a custom tool_invoker at construction time in 2.x, which makes subclassing the correct extension point. The temporary patching approach in this post is simpler for demonstration but should be replaced with the subclass pattern in production code.
How do I handle the case where the validator legitimately needs more than 4–5 iterations for a difficult input?
Track validator scores, not just pass/fail. If the validator emits a numeric quality score, use an improvement threshold instead of a fixed rejection count: trip only if the best score seen in the last N iterations hasn't improved by more than δ. A validator that rejected 6 times but whose score went from 0.4 → 0.6 → 0.71 → 0.74 is making progress. A validator that rejected 6 times with scores 0.41 → 0.40 → 0.42 → 0.40 is not. The record_validator_result method in this implementation accepts any boolean — you can replace it with a record_validator_score(score: float) version that uses the improvement criterion instead.
Does the delegation depth ContextVar work correctly across async Haystack pipelines?
contextvars.ContextVar is coroutine-safe in Python's asyncio model: each task gets its own copy of context variables, and ContextVar.set() is local to the current task. This means delegation depth tracking works correctly for async pipelines — each concurrent pipeline execution has an independent depth counter. The only case where this breaks is if you manually share context between tasks using contextvars.copy_context(). For thread-based parallelism, ContextVar is also thread-safe because Python threads each get a copy of the context at thread creation time. Both concurrency models are handled correctly.
Should I set max_runs_per_component on the pipeline or rely entirely on the circuit breaker?
Both. max_runs_per_component is a hard stop from Haystack's runtime — if the circuit breaker somehow fails to trip (a bug in your hook wiring, a component running outside the breaker's visibility), max_runs_per_component is the last line of defense. Set it as a backstop at 2–3× your expected maximum iteration count. The circuit breaker adds semantics that max_runs_per_component can't: it trips early when it detects a structural failure pattern, preserves the best partial result, and emits an actionable reason string. Together they give you both a smart early-exit and a hard ceiling. Never remove max_runs_per_component because the breaker is installed.