IBM watsonx.ai Agents Cost Control: Loop Detection and Budget Enforcement in Production

IBM watsonx.ai's agent framework gives enterprise teams a familiar ReAct orchestration loop: define tools, attach a Granite or third-party LLM, and the agent calls tools iteratively until it reaches a final answer. The IBM watsonx.ai Python SDK wraps this loop cleanly, and watsonx Orchestrate layers an enterprise workflow UI on top. What neither product ships with is a real-time circuit breaker — nothing that detects a tool call spiral while it's happening, enforces a per-run token budget, or stops nested agent chains from recursing beyond a safe depth.

For enterprise teams, that gap is more costly than it sounds. watsonx.ai agents are frequently deployed in document-heavy workflows — summarizing regulatory filings, processing legal contracts, querying internal knowledge bases — where a single runaway session can burn through thousands of tokens in a matter of minutes. IBM's Granite models bill per token like any other LLM. An agent that gets confused by an ambiguous document and calls its retrieval tool 40 times before the token limit kicks in is an expensive debugging session, not a free retry.

This post covers four failure modes specific to the watsonx.ai agent architecture, with complete Python implementations of guards for each one. If you'd rather not maintain the guard layer yourself, the final section shows how to call RunGuard from any watsonx.ai agent via a single tool wrapper.

How IBM watsonx.ai's agent framework works

The IBM watsonx.ai Python SDK (version 1.x+) exposes agent capabilities through ibm_watsonx_ai. The core loop is a standard ReAct pattern:

  1. The LLM receives the system prompt, current conversation history, and tool schema definitions.
  2. The LLM outputs either a final answer or a tool invocation (function call format).
  3. The SDK executes the referenced tool and appends the result to conversation history.
  4. The loop continues until the LLM produces a final answer, a maximum iteration count is reached, or the token limit is hit.

watsonx.ai supports Granite models (IBM's own family, including granite-3-8b-instruct, granite-3-2b-instruct, and the larger granite-34b-code-instruct), Llama models via IBM's hosting, and third-party models through watsonx.ai's unified inference API. The SDK's ModelInference class handles model calls, and agents are typically constructed using the ReActAgent helper or custom tool-call loop implementations built on top of chat().

watsonx Orchestrate — IBM's higher-level enterprise platform — wraps these agents with a skills-based UI. "Skills" in Orchestrate map to tools in the underlying agent framework. When a skill fails or loops, Orchestrate surfaces an error to the user after timeout, but it does not trip a circuit breaker during the loop.

The gap: watsonx.ai's max_new_tokens parameter controls token generation per model call, not total session cost. The iteration limit (if configured) counts steps, not semantic repetition. Neither catches a tool spiral or a nested agent chain mid-flight — they fire only after the damage is done.

Failure mode 1: Tool call invocation spiral

The most common watsonx.ai agent failure, especially in RAG and document-processing workflows. The agent calls a retrieval or search tool, receives a result that partially but not fully answers the query, and calls the same tool again with a slightly different query. Each call produces marginally different results. The model never converges on a synthesized answer because the data is fragmented or the goal is underspecified — so it keeps searching.

In a watsonx.ai legal document agent, this looks like:

  • Step 1: search_knowledge_base("GDPR data retention requirements financial institutions")
  • Step 2: search_knowledge_base("data retention policy banks EU regulation")
  • Step 3: search_knowledge_base("GDPR article 5 retention limitation banking sector")
  • Step 4: search_knowledge_base("financial data retention GDPR compliance 2024")
  • …(continues until max_tokens or iteration cap)

The queries are semantically near-identical — variations on the same information need — but they're syntactically different enough that a simple exact-match dedup wouldn't catch them. The guard needs to compute semantic similarity across a rolling window of recent tool calls.

Here is a Python spiral guard for watsonx.ai agents using Jaccard similarity on normalized query tokens:

import re
from collections import deque
from typing import Any, Callable

def normalize_query(text: str) -> set:
    """Lowercase, strip punctuation, split into tokens, drop stop words."""
    stop_words = {"the", "a", "an", "of", "in", "for", "to", "and", "or",
                  "is", "are", "was", "were", "be", "been", "being", "with"}
    tokens = re.sub(r"[^\w\s]", " ", text.lower()).split()
    return {t for t in tokens if t not in stop_words and len(t) > 2}

def jaccard_similarity(set_a: set, set_b: set) -> float:
    if not set_a and not set_b:
        return 1.0
    union = set_a | set_b
    if not union:
        return 0.0
    return len(set_a & set_b) / len(union)

class SpiralGuard:
    def __init__(self, window_size: int = 4, similarity_threshold: float = 0.72,
                 min_high_similarity_pairs: int = 2):
        self.window_size = window_size
        self.similarity_threshold = similarity_threshold
        self.min_pairs = min_high_similarity_pairs
        self._histories: dict[str, dict[str, deque]] = {}

    def _get_history(self, session_id: str, tool_name: str) -> deque:
        if session_id not in self._histories:
            self._histories[session_id] = {}
        if tool_name not in self._histories[session_id]:
            self._histories[session_id][tool_name] = deque(maxlen=self.window_size)
        return self._histories[session_id][tool_name]

    def check(self, session_id: str, tool_name: str, tool_args: dict) -> None:
        """Raise RuntimeError if a spiral is detected. Call before each tool execution."""
        query_str = " ".join(str(v) for v in tool_args.values())
        query_tokens = normalize_query(query_str)
        history = self._get_history(session_id, tool_name)

        if len(history) >= 2:
            high_sim_count = sum(
                1 for past_tokens in history
                if jaccard_similarity(query_tokens, past_tokens) >= self.similarity_threshold
            )
            if high_sim_count >= self.min_pairs:
                raise RuntimeError(
                    f"[SpiralGuard] Tool call spiral detected on '{tool_name}'. "
                    f"{high_sim_count} of last {len(history)} calls have similarity "
                    f">= {self.similarity_threshold}. Session: {session_id}"
                )

        history.append(query_tokens)

    def wrap_tool(self, tool_fn: Callable, tool_name: str, session_id: str) -> Callable:
        """Decorator that injects spiral detection around any tool function."""
        def guarded(*args, **kwargs):
            self.check(session_id, tool_name, kwargs or {"args": args})
            return tool_fn(*args, **kwargs)
        guarded.__name__ = tool_fn.__name__
        return guarded

Usage in a watsonx.ai agent loop:

from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.foundation_models import ModelInference

spiral_guard = SpiralGuard(window_size=4, similarity_threshold=0.72)
session_id = f"session-{request_id}"

# Wrap each tool function before registering with the agent
guarded_search = spiral_guard.wrap_tool(search_knowledge_base, "search_knowledge_base", session_id)
guarded_fetch = spiral_guard.wrap_tool(fetch_document, "fetch_document", session_id)

# The agent loop calls guarded_search / guarded_fetch instead of the originals
# When a spiral is detected, RuntimeError propagates up and the agent stops cleanly

Set similarity_threshold between 0.65 and 0.80. Below 0.65 you'll see false positives on legitimate exploratory queries. Above 0.80 the guard misses spirals where the model varies vocabulary enough to drop similarity below the threshold while still looping semantically.

Failure mode 2: Nested agent chaining without depth control

watsonx.ai and watsonx Orchestrate both support agent-as-tool patterns — one agent can invoke another agent as if it were a regular tool. This is a powerful composition mechanism for complex enterprise workflows: a routing agent dispatches to a compliance-checking agent, which dispatches to a document-retrieval agent, which dispatches to a summarization agent. Each layer is a full ReAct loop.

The failure mode emerges when the nesting is implicit rather than explicit. An agent configured to "handle any HR policy question" might route to a benefits-lookup agent, which routes to a regulatory-compliance agent, which calls back to the HR policy agent for context — creating a cycle invisible at any single layer. Without a depth counter propagated through the chain, each agent trusts that it's being called by a human or a top-level orchestrator. The chain runs until one agent hits a token limit or iteration cap, at which point the error propagates back through all the nested layers, often corrupting the state of the outer agents in the process.

import threading

# Thread-local depth counter — each call chain gets its own counter
_depth = threading.local()

class DepthGuard:
    def __init__(self, max_depth: int = 4):
        self.max_depth = max_depth

    def get_current_depth(self) -> int:
        return getattr(_depth, "value", 0)

    def check_and_increment(self) -> int:
        current = self.get_current_depth()
        if current >= self.max_depth:
            raise RuntimeError(
                f"[DepthGuard] Nested agent depth {current} exceeds maximum {self.max_depth}. "
                "Possible agent call cycle detected."
            )
        _depth.value = current + 1
        return _depth.value

    def decrement(self) -> None:
        _depth.value = max(0, self.get_current_depth() - 1)

    def run_agent(self, agent_fn: Callable, *args, **kwargs) -> Any:
        """Wrap any sub-agent invocation to track depth."""
        self.check_and_increment()
        try:
            return agent_fn(*args, **kwargs)
        finally:
            self.decrement()

depth_guard = DepthGuard(max_depth=4)

# When your agent calls another agent as a tool:
def call_compliance_agent(query: str) -> str:
    return depth_guard.run_agent(_compliance_agent_impl, query)

def call_document_agent(doc_id: str) -> str:
    return depth_guard.run_agent(_document_agent_impl, doc_id)

# Register call_compliance_agent and call_document_agent as tools
# The depth guard automatically prevents recursive chains beyond depth 4

For distributed watsonx Orchestrate deployments where agents run as separate microservices, propagate the depth counter as an HTTP header (X-RG-Agent-Depth) and validate it at the entry point of each agent service. The thread-local approach above works for in-process chaining; cross-service chaining requires the header propagation pattern.

Failure mode 3: RAG retrieval context avalanche

watsonx.ai is heavily used for RAG (Retrieval-Augmented Generation) workflows, often with IBM's own Watson Discovery or external vector stores like Milvus, Chroma, or Pinecone. The failure mode specific to RAG agents is token budget exhaustion through retrieval accumulation: each iteration of the agent loop retrieves N documents, appends them to the conversation history, and the context grows quadratically as documents from previous iterations are re-transmitted on each subsequent call.

IBM's Granite models have context windows ranging from 8,192 tokens (smaller Granite 3 models) to 128,000 tokens (Granite 3 MoE). A retrieval agent that fetches five 500-token document chunks per iteration will exhaust an 8K context in 2-3 iterations of tool calls — which means the model starts receiving truncated context silently, producing lower-quality answers, and calling the retrieval tool again to get what it thinks it missed. This is a self-reinforcing loop: truncation causes more retrieval calls, which causes more truncation.

class BudgetGuard:
    def __init__(self, max_session_tokens: int = 6000,
                 max_single_result_tokens: int = 1500,
                 chars_per_token: float = 4.0):
        self.max_session_tokens = max_session_tokens
        self.max_single_result_tokens = max_single_result_tokens
        self.chars_per_token = chars_per_token
        self._session_tokens: dict[str, int] = {}

    def _estimate_tokens(self, text: str) -> int:
        return max(1, int(len(text) / self.chars_per_token))

    def check_result(self, session_id: str, tool_name: str, result: str) -> str:
        """Call after each tool returns. Truncates oversized results and
        raises RuntimeError if the session budget is exhausted."""
        result_tokens = self._estimate_tokens(result)

        # Truncate single oversized result
        if result_tokens > self.max_single_result_tokens:
            max_chars = int(self.max_single_result_tokens * self.chars_per_token)
            result = result[:max_chars] + f"\n[truncated: {result_tokens} estimated tokens, limit {self.max_single_result_tokens}]"
            result_tokens = self.max_single_result_tokens

        # Accumulate session budget
        self._session_tokens[session_id] = (
            self._session_tokens.get(session_id, 0) + result_tokens
        )
        session_total = self._session_tokens[session_id]

        if session_total >= self.max_session_tokens:
            raise RuntimeError(
                f"[BudgetGuard] Session token budget exhausted on tool '{tool_name}'. "
                f"Accumulated ~{session_total} tokens (limit: {self.max_session_tokens}). "
                f"Session: {session_id}"
            )

        return result

budget_guard = BudgetGuard(max_session_tokens=6000, max_single_result_tokens=1500)

# Wrap retrieval tools to intercept their return values
def guarded_retrieve(query: str, session_id: str) -> str:
    raw_result = search_knowledge_base(query)
    return budget_guard.check_result(session_id, "search_knowledge_base", raw_result)

def guarded_fetch_doc(doc_id: str, session_id: str) -> str:
    raw_result = fetch_document(doc_id)
    return budget_guard.check_result(session_id, "fetch_document", raw_result)

The chars_per_token divisor should be tuned to your document types. English prose: 4.0. JSON API responses: 3.0. Structured tables (CSV, TSV): 2.5. If your agents primarily process regulatory documents with dense numerical tables, use 2.5–3.0 to avoid underestimating retrieval cost.

Failure mode 4: Granite model retry storm

IBM's watsonx.ai API has rate limits and occasionally returns transient errors (HTTP 429, 503) during high-load periods. The SDK's default behavior on a failed inference call is to raise an exception, which application code typically catches and retries with exponential backoff. The failure mode arises when the application retry loop and the agent's tool retry logic stack on top of each other.

Consider: a watsonx.ai agent calls a tool that makes an HTTP request to an internal API. The internal API is degraded and returns 500 errors. The agent's tool implementation retries the API call up to 3 times. When the tool eventually fails after 3 retries, the agent calls the model for the next step, the model decides to try the same tool again, and the 3-retry sequence repeats. Meanwhile, if the model call itself is rate-limited, the SDK-level retry (or your backoff wrapper) also fires. The result: a single "try this tool" decision by the model cascades into 9 or more actual HTTP requests to the degraded downstream API — the very thing your rate-limit backoff was supposed to prevent.

from collections import defaultdict
from time import time

class FailureGuard:
    def __init__(self, max_consecutive_fails: int = 3,
                 fail_rate_window: int = 5,
                 max_fail_rate: float = 0.70):
        self.max_consecutive_fails = max_consecutive_fails
        self.fail_rate_window = fail_rate_window
        self.max_fail_rate = max_fail_rate
        # per-session, per-tool tracking
        self._consecutive: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self._recent: dict[str, dict[str, list]] = defaultdict(lambda: defaultdict(list))

    def record(self, session_id: str, tool_name: str, success: bool) -> None:
        """Record a tool execution outcome. Call immediately after each tool attempt."""
        now = time()

        # Consecutive failure counter
        if success:
            self._consecutive[session_id][tool_name] = 0
        else:
            self._consecutive[session_id][tool_name] += 1

        # Rolling window of outcomes (True=success, False=fail) with timestamps
        window = self._recent[session_id][tool_name]
        window.append((now, success))
        # Prune entries older than window size
        cutoff = now - 120  # 2-minute window
        self._recent[session_id][tool_name] = [e for e in window if e[0] > cutoff]

    def check(self, session_id: str, tool_name: str) -> None:
        """Call BEFORE each tool execution. Raises RuntimeError if failure thresholds are exceeded."""
        consecutive = self._consecutive[session_id][tool_name]
        if consecutive >= self.max_consecutive_fails:
            raise RuntimeError(
                f"[FailureGuard] Tool '{tool_name}' has failed {consecutive} consecutive times. "
                f"Stopping agent to prevent retry storm. Session: {session_id}"
            )

        recent = self._recent[session_id][tool_name]
        if len(recent) >= self.fail_rate_window:
            fail_count = sum(1 for _, success in recent[-self.fail_rate_window:] if not success)
            fail_rate = fail_count / self.fail_rate_window
            if fail_rate >= self.max_fail_rate:
                raise RuntimeError(
                    f"[FailureGuard] Tool '{tool_name}' failure rate {fail_rate:.0%} over last "
                    f"{self.fail_rate_window} calls exceeds threshold {self.max_fail_rate:.0%}. "
                    f"Session: {session_id}"
                )

failure_guard = FailureGuard(max_consecutive_fails=3, fail_rate_window=5, max_fail_rate=0.70)

def guarded_tool_execute(tool_fn: Callable, tool_name: str,
                          session_id: str, *args, **kwargs) -> Any:
    failure_guard.check(session_id, tool_name)
    try:
        result = tool_fn(*args, **kwargs)
        failure_guard.record(session_id, tool_name, success=True)
        return result
    except Exception as e:
        failure_guard.record(session_id, tool_name, success=False)
        raise

Combining all four guards: a watsonx.ai agent orchestrator

The four guards are independent and compose cleanly. Here is a complete example of a guarded watsonx.ai agent loop using all four guards together:

import uuid
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.foundation_models import ModelInference

# Initialize all guards
spiral_guard = SpiralGuard(window_size=4, similarity_threshold=0.72)
depth_guard = DepthGuard(max_depth=4)
budget_guard = BudgetGuard(max_session_tokens=6000, max_single_result_tokens=1500)
failure_guard = FailureGuard(max_consecutive_fails=3, fail_rate_window=5, max_fail_rate=0.70)

def run_watsonx_agent(user_query: str, max_iterations: int = 20) -> str:
    session_id = str(uuid.uuid4())
    credentials = Credentials(url="https://us-south.ml.cloud.ibm.com", api_key=WATSONX_API_KEY)
    model = ModelInference(
        model_id="ibm/granite-3-8b-instruct",
        credentials=credentials,
        project_id=PROJECT_ID,
        params={"max_new_tokens": 1024, "temperature": 0.1}
    )

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_query}
    ]
    tool_schemas = build_tool_schemas()  # your tool definitions

    for iteration in range(max_iterations):
        try:
            response = model.chat(messages=messages, tools=tool_schemas)
        except Exception as e:
            return f"Model error after {iteration} iterations: {e}"

        choice = response["choices"][0]["message"]

        # Final answer — no tool call
        if not choice.get("tool_calls"):
            return choice.get("content", "")

        messages.append({"role": "assistant", **choice})

        # Execute each tool call with all four guards active
        tool_results = []
        for tool_call in choice["tool_calls"]:
            tool_name = tool_call["function"]["name"]
            tool_args = json.loads(tool_call["function"]["arguments"])

            try:
                # Guard 1: spiral detection (before execution)
                spiral_guard.check(session_id, tool_name, tool_args)
                # Guard 4: failure rate (before execution)
                failure_guard.check(session_id, tool_name)

                # Execute the actual tool
                raw_result = TOOL_REGISTRY[tool_name](**tool_args)
                failure_guard.record(session_id, tool_name, success=True)

                # Guard 3: budget enforcement (after execution, on result)
                guarded_result = budget_guard.check_result(session_id, tool_name, str(raw_result))
                tool_results.append({
                    "tool_call_id": tool_call["id"],
                    "role": "tool",
                    "content": guarded_result
                })

            except RuntimeError as guard_error:
                # A guard tripped — stop the agent cleanly and report
                send_slack_alert(str(guard_error), session_id=session_id)
                return f"Agent stopped by safety guard: {guard_error}"
            except Exception as tool_error:
                failure_guard.record(session_id, tool_name, success=False)
                tool_results.append({
                    "tool_call_id": tool_call["id"],
                    "role": "tool",
                    "content": f"Error: {tool_error}"
                })

        messages.extend(tool_results)

    return f"Reached maximum {max_iterations} iterations without final answer."

Guard configuration reference

Guard Parameter Default When to adjust
Spiralwindow_size window_size 4 Raise to 6 for agents that legitimately call the same tool with varied queries before synthesizing (exploratory research agents). Lower to 3 for tightly-scoped Q&A agents where any repetition is a spiral.
Spiral similarity_threshold 0.72 Lower to 0.65 for English-only enterprise document queries with specialized vocabulary. Raise to 0.80 for agents that operate in multiple languages or mix technical identifiers with prose.
Depth max_depth 4 Raise to 6 only if you have a documented need for deeply nested watsonx Orchestrate agent chains. Three levels of nesting (top → routing → specialist → retrieval) is typical; depth 4 gives one level of headroom before the guard fires.
Budget max_session_tokens 6000 Tune to 2× the expected normal-case retrieval volume. If your agents typically retrieve 4 documents of ~400 tokens each per session, set the limit to 3200–4000 to give headroom for legitimate multi-step lookups while catching runaway retrieval.
Budget max_single_result_tokens 1500 Raise to 3000 for agents that process full legal or financial documents where a single document may be 10,000+ tokens. The guard truncates at this limit and continues; it doesn't stop the session on a single large result.
Failure max_consecutive_fails 3 Raise to 5 for agents that call APIs known to be occasionally flaky but important enough to retry more aggressively. Set to 2 for guarded production deployments where any repeated failure indicates a systemic issue.

RunGuard integration for watsonx.ai

If you'd rather not maintain the four guard classes yourself, RunGuard provides all four checks as a managed HTTP endpoint. The integration requires a single additional tool call before each of your existing tool calls:

import requests

RUNGUARD_API_KEY = os.environ["RUNGUARD_API_KEY"]
RUNGUARD_URL = "https://api.runguard.dev/v1/check"

def runguard_check(session_id: str, tool_name: str, tool_args: dict,
                    depth: int = 0) -> None:
    """Call before each tool execution. Raises RuntimeError if any guard trips."""
    resp = requests.post(
        RUNGUARD_URL,
        headers={"X-RunGuard-Key": RUNGUARD_API_KEY},
        json={
            "app_id": "watsonx-agent-prod",
            "session_id": session_id,
            "tool_name": tool_name,
            "tool_args": tool_args,
            "depth": depth
        },
        timeout=2.0
    )
    if resp.status_code == 409:
        data = resp.json()
        raise RuntimeError(f"[RunGuard] {data['reason']}: {data['detail']}")
    resp.raise_for_status()

def runguard_record_result(session_id: str, tool_name: str,
                            result: str, success: bool) -> str:
    """Post tool result to RunGuard for budget tracking and failure recording.
    Returns (possibly truncated) result string."""
    resp = requests.post(
        RUNGUARD_URL.replace("/check", "/record"),
        headers={"X-RunGuard-Key": RUNGUARD_API_KEY},
        json={
            "app_id": "watsonx-agent-prod",
            "session_id": session_id,
            "tool_name": tool_name,
            "result": result,
            "success": success
        },
        timeout=2.0
    )
    if resp.ok:
        return resp.json().get("result", result)
    return result  # fallback: return unmodified if RunGuard unreachable

RunGuard persists all trip events in a dashboard with the full tool call history, similarity scores, depth traces, and token budget consumption — the same information the local guard classes would log to your application logger, but searchable across all sessions and all deployed agents. Slack alerts are included in all plans; PagerDuty integration is available on Team.

FAQ

Does this work with watsonx Orchestrate's skills, or only with the Python SDK's agent loop?

The guards described here apply to the underlying Python SDK agent loop that watsonx Orchestrate's skills run on. If you're deploying a custom skill in Orchestrate backed by a Python function, you can wrap that function with the guards exactly as shown. For out-of-the-box Orchestrate skills (pre-built IBM skills), you don't control the execution loop, so the guards can only be applied at the boundary where your custom skill calls another agent or external API. The RunGuard HTTP endpoint integration is most practical for this case: your skill makes a single HTTP check call before each sub-agent invocation, and RunGuard handles the cross-skill session tracking.

IBM's Granite models have built-in context limits. Won't the model just stop naturally when the context is full?

The model stops generating tokens when the context is full, but this is not a clean stop — it's a truncation. With smaller Granite 3 models (8,192-token context window), a retrieval agent that appends documents on every iteration will silently receive a truncated conversation history after 2-3 iterations. The model sees an incomplete view of its prior work and may call tools redundantly because it can't "remember" what it already retrieved. The RAG retrieval context avalanche guard catches this before it happens by tracking accumulated token estimates and stopping the agent with a meaningful error message rather than letting the context window impose a silent truncation mid-loop.

The depth guard uses thread-local storage. Does this work correctly with Python's async / asyncio agents?

Thread-local storage does not propagate across asyncio task boundaries — if your agent loop uses async def and await, a sub-task spawned with asyncio.create_task() will not inherit the depth counter from the parent coroutine. For async agents, replace threading.local() with a contextvars.ContextVar, which propagates correctly across asyncio task boundaries when tasks are spawned from a parent coroutine using copy_context(). Specifically: _depth_var = contextvars.ContextVar("agent_depth", default=0), then use _depth_var.get() and _depth_var.set() in place of _depth.value. The guard logic is otherwise identical.

We use IBM Watson Discovery as our knowledge base. Are there watsonx.ai-specific retrieval patterns that need different guard thresholds?

Watson Discovery returns document passages with confidence scores, which means a sophisticated agent might legitimately re-query with different parameters to get higher-confidence passages — this is intentional variation, not a spiral. For Discovery-backed agents, raise window_size to 5 or 6 and similarity_threshold to 0.78-0.82 to give the agent more latitude for legitimate query refinement before the spiral guard fires. Combine this with a tighter max_session_tokens budget: Discovery passages can be dense (several hundred tokens each), so a 4,000-token session budget with a 5-call window gives you ~800 tokens per retrieval step, which is enough for two or three full-length passages per step. The budget guard provides the backstop that lets the spiral guard be more lenient on query variation.

Can I use the RunGuard endpoint in a watsonx Orchestrate deployment without changing the Orchestrate workflow configuration?

Yes, with one caveat: Orchestrate's out-of-the-box skills don't expose a hook for pre-execution middleware. The practical integration point is at the boundary of any custom Python skill you've built — RunGuard is called from inside your skill's run() method before the skill dispatches to sub-agents or external tools. For custom skills built with the watsonx.ai Python SDK, this means adding two lines: the runguard_check() call at the top of each tool dispatch and the runguard_record_result() call on each result. This doesn't require any changes to the Orchestrate workflow configuration or skill registry. The only configuration change is adding the RUNGUARD_API_KEY environment variable to your skill deployment.

Stop runaway watsonx.ai agents before the bill lands

RunGuard wraps all four watsonx.ai agent guards — spiral detection, nested agent depth limiting, RAG retrieval budget enforcement, and failure cascade prevention — as a managed HTTP endpoint. Two lines of Python replaces four guard classes, and you get a persistent 30-day trip dashboard with Slack alerts included.

Start free 14-day trial →