Amazon Bedrock Inline Agents Cost Control: Loop Detection and Budget Enforcement in Production

Standard AWS Bedrock Agents require you to pre-configure an agent in the Bedrock console — specify the foundation model, write the system instructions, attach action groups and knowledge bases, deploy, and get back an agentId + agentAliasId pair. Every invocation calls that fixed configuration. Amazon Bedrock Inline Agents work differently: you call invoke_inline_agent and supply the entire agent configuration — foundation model, instruction, action groups, knowledge bases — as parameters in the request body. The agent is ephemeral. There is no pre-configured agent ID. The configuration is defined at invocation time and discarded when the run finishes.

That flexibility is exactly what multi-agent collaboration architectures need. A supervisor agent can route a sub-task to a specialized inline agent with a task-specific instruction and a curated set of tools, rather than calling a fixed sub-agent with a generic prompt. But it creates four cost failure modes that don't exist in standard Bedrock Agents, where the configuration is frozen and identical across all invocations of the same agent alias.

This post covers invoke_inline_agent specifically. If you're using invoke_agent with a pre-configured Bedrock Agent (fixed agentId + agentAliasId), see AWS Bedrock Agents Cost Control. The failure modes, signatures, and circuit breaker code are different.

The invoke_inline_agent API surface

The Python call looks like this:

import boto3

client = boto3.client("bedrock-agent-runtime", region_name="us-east-1")

response = client.invoke_inline_agent(
    sessionId="unique-session-id",
    foundationModel="anthropic.claude-3-5-sonnet-20241022-v2:0",
    instruction="You are a data analysis specialist. Given a SQL query request, "
                "write the query, execute it, interpret the results, and return "
                "a structured summary. Be concise. Do not ask clarifying questions.",
    inputText="How many orders shipped last week by region?",
    actionGroups=[
        {
            "actionGroupName": "DatabaseQuery",
            "description": "Execute SQL queries against the analytics warehouse",
            "actionGroupExecutor": {
                "lambda": "arn:aws:lambda:us-east-1:123456789:function:db-query"
            },
            "apiSchema": {
                "payload": json.dumps(api_schema_dict)
            }
        }
    ],
    knowledgeBases=[
        {
            "knowledgeBaseId": "ABCDEF1234",
            "description": "Data dictionary and schema documentation"
        }
    ],
    enableTrace=True,
    endSessionOnEnd=False,
)

# Consume the streaming response
for event in response["completion"]:
    if "chunk" in event:
        text = event["chunk"]["bytes"].decode("utf-8")
        print(text, end="", flush=True)
    elif "trace" in event:
        pass  # orchestration trace events — useful for debugging
    elif "internalServerException" in event:
        raise RuntimeError(f"Bedrock error: {event['internalServerException']['message']}")

Key parameters that affect cost:

Parameter Cost impact
instruction
str, required
This is your agent's system prompt. It's included in every internal orchestration step's input token count. A 500-token instruction across 8 orchestration steps = 4,000 extra input tokens per invocation. Dynamic instructions that grow over time compound this.
actionGroups
list, optional
Each action group definition (API schema or function schema) is serialized and included in the model's tool definition context. The more action groups, the larger the tool-definition payload per turn. Each Lambda invocation triggered by the agent is billed separately from the model call.
knowledgeBases
list, optional
Each knowledge base retrieval is an additional model call (the retrieval augmentation step). A single invoke_inline_agent call that queries two knowledge bases can result in 3+ model invocations total.
sessionId
str, required
If you reuse the same sessionId, Bedrock accumulates the conversation history and re-sends it as context on every subsequent call. The token cost per call grows quadratically with conversation length.
enableTrace
bool, default True
Controls whether trace events are included in the response stream. The trace is generated by the orchestration model as part of its reasoning — it represents real model output that was already billed. Enabling or disabling it does not change the billed token count, but receiving the trace is essential for observability and for detecting failure modes 3 and 4 below.

The critical difference from a standard invoke_agent call is that you cannot predict the number of orchestration steps from outside the call. A single invoke_inline_agent invocation may complete in 1 step (direct answer) or take 12 steps (complex multi-tool reasoning). You observe the step count only by counting trace events in the response stream — and only then if enableTrace=True.

Failure mode 1 — Dynamic instruction loop

Standard Bedrock Agents have a fixed, immutable instruction. Every invocation of a given agent alias uses the same system prompt. Inline agents have a dynamic instruction — your code constructs it at invocation time, often by prompting another LLM to generate a task-specific instruction for the sub-agent.

The failure pattern:

  1. Your supervisor agent calls a Claude/Nova model to generate an instruction for the sub-agent based on the current task context.
  2. The generated instruction is slightly different from the previous attempt — different phrasing, an extra constraint clause, a rephrased objective.
  3. The sub-agent's behavior changes subtly because of the instruction variation.
  4. The supervisor evaluates the result, finds it unsatisfactory, and generates a new instruction.
  5. Repeat. Each cycle: one supervisor LLM call + one inline agent invocation (with N internal orchestration steps).

This loop is invisible to maxLength (the per-invocation orchestration step limit) because each call stays within the limit. The runaway is at the orchestrator level — too many invocations, not too many steps within a single invocation.

class InstructionFingerprinter:
    """
    Detects near-duplicate instruction variations across invoke_inline_agent calls.
    Uses character n-gram Jaccard similarity — fast and allocation-friendly
    for instruction strings up to ~2000 characters.
    """

    def __init__(self, window: int = 5, threshold: float = 0.80):
        self.window = window          # sliding window of recent instructions
        self.threshold = threshold    # Jaccard similarity above this = near-duplicate
        self._history: list[set] = []

    def _ngrams(self, text: str, n: int = 4) -> set:
        text = text.lower().strip()
        return {text[i:i+n] for i in range(len(text) - n + 1)}

    def check(self, instruction: str) -> None:
        """
        Call before each invoke_inline_agent call.
        Raises RuntimeError if this instruction is near-duplicate of a recent one.
        """
        fp = self._ngrams(instruction)
        for prior in self._history[-self.window:]:
            union = prior | fp
            if not union:
                continue
            jaccard = len(prior & fp) / len(union)
            if jaccard >= self.threshold:
                raise RuntimeError(
                    f"Inline agent instruction loop detected: Jaccard similarity "
                    f"{jaccard:.2f} >= {self.threshold:.2f}. The orchestrator is "
                    f"generating near-identical instructions across calls — the "
                    f"sub-agent is unlikely to produce different results. "
                    f"Check the supervisor prompt for convergence conditions."
                )
        self._history.append(fp)
        if len(self._history) > self.window * 2:
            self._history = self._history[-self.window:]

    def reset(self) -> None:
        self._history.clear()

Use 0.80 as the default threshold rather than the 0.72 used for tool call spirals in other frameworks. Instruction text tends to be more paraphrased than tool inputs — a threshold of 0.72 would catch too many legitimate instruction variations where the orchestrator is genuinely trying different approaches. At 0.80, you catch the case where the instruction is semantically identical but surface-rephrased: "Analyze the dataset and produce a summary" vs "Analyze the data and generate a summary" vs "Process the dataset and output a summary".

Failure mode 2 — Session context accumulation

If you supply the same sessionId across multiple invoke_inline_agent calls, Bedrock Agents persists the conversation history server-side and prepends it to every subsequent call's input context. This is useful for multi-turn conversations with human users. For automated pipelines, it creates a token cost curve that compounds with each call.

The math:

  • Call 1: base tokens (instruction + action groups + KB definitions + user message) = ~3,000 tokens
  • Call 2: same base + call-1 history = ~3,000 + ~1,500 (prior exchange) = ~4,500 tokens
  • Call 3: same base + call-1 + call-2 history = ~3,000 + ~3,000 = ~6,000 tokens
  • Call N: base + sum of all prior exchanges = cost grows linearly with N, making total session cost O(N²)

Bedrock Inline Agents don't expose the accumulated context token count via the API response — there's no equivalent of Anthropic's response.usage.input_tokens per call in the streaming event schema. You must track it yourself by counting turns.

class SessionAccumulationGuard:
    """
    Tracks calls per sessionId and warns / halts when the context
    accumulation cost is projected to exceed a threshold.
    """

    def __init__(
        self,
        base_tokens_per_call: int = 3000,
        tokens_per_exchange: int = 1200,
        max_context_tokens: int = 100_000,
        price_per_input_token: float = 3.0 / 1_000_000,  # Claude 3.5 Sonnet input rate
    ):
        self.base = base_tokens_per_call
        self.exchange = tokens_per_exchange
        self.max_context = max_context_tokens
        self.price_per_token = price_per_input_token
        self._call_counts: dict[str, int] = {}

    def check(self, session_id: str) -> None:
        """
        Call before each invoke_inline_agent call.
        Raises if projected context will exceed max_context_tokens.
        """
        n = self._call_counts.get(session_id, 0)
        projected = self.base + (n * self.exchange)
        if projected > self.max_context:
            raise RuntimeError(
                f"Session context accumulation guard tripped for session '{session_id}': "
                f"projected input context {projected:,} tokens (call {n+1}) exceeds "
                f"ceiling {self.max_context:,} tokens. Start a new sessionId or "
                f"compact the session history before continuing."
            )

    def record(self, session_id: str) -> None:
        """Call after each successful invoke_inline_agent call."""
        self._call_counts[session_id] = self._call_counts.get(session_id, 0) + 1

    def reset_session(self, session_id: str) -> None:
        """Call when you start a fresh session to clear accumulated count."""
        self._call_counts.pop(session_id, None)

    def session_cost_usd(self, session_id: str) -> float:
        """Approximate total input-token cost for this session so far."""
        n = self._call_counts.get(session_id, 0)
        total_tokens = sum(self.base + (i * self.exchange) for i in range(n))
        return total_tokens * self.price_per_token

The fix is straightforward: for automated pipelines where each invoke_inline_agent call is a discrete task (not a multi-turn conversation with the same user), generate a fresh sessionId (e.g., str(uuid.uuid4())) per call. This gives you zero accumulated history and predictable per-call token counts. Only reuse a sessionId when you explicitly want the agent to have memory of a prior exchange in the same logical conversation.

Failure mode 3 — Action group thrash

Inline agents can accept multiple action groups per invocation. Each action group has a description field that tells the agent what the group does. When multiple action groups overlap semantically — or when the agent isn't converging on the right action group — you get action group thrash: the agent cycles through different action groups across orchestration steps, invoking multiple Lambda functions for the same underlying task.

The cost multiplier here is the Lambda invocation cost on top of the model call cost. Each Lambda invocation returns a result that becomes part of the next orchestration step's context. A 10-step orchestration that invokes 3 different Lambda functions costs: N model calls + 3 Lambda invocations + the input tokens for each Lambda result fed back into context.

The signature: you see multiple distinct actionGroupInvocationInput.actionGroupName values in the trace events within a single invocation, where those groups are semantically overlapping (e.g., QueryAnalyticsDB and QueryDataWarehouse being invoked sequentially for what should be a single lookup).

def count_action_group_invocations(trace_events: list) -> dict[str, int]:
    """
    Parse trace events from a completed invoke_inline_agent call and
    count invocations per action group. Returns {action_group_name: count}.
    """
    counts = {}
    for event in trace_events:
        trace = event.get("trace", {}).get("orchestrationTrace", {})
        ag_input = trace.get("invocationInput", {}).get("actionGroupInvocationInput", {})
        name = ag_input.get("actionGroupName")
        if name:
            counts[name] = counts.get(name, 0) + 1
    return counts


def check_action_group_thrash(
    trace_events: list,
    max_per_group: int = 3,
    max_distinct_groups: int = 4,
) -> None:
    """
    Raises RuntimeError if the trace shows action group thrash patterns:
    - Any single action group invoked more than max_per_group times
    - More distinct action groups invoked than max_distinct_groups
    """
    counts = count_action_group_invocations(trace_events)
    for name, count in counts.items():
        if count > max_per_group:
            raise RuntimeError(
                f"Action group thrash: '{name}' invoked {count} times in a single "
                f"inline agent call (limit {max_per_group}). Check for overlapping "
                f"action group descriptions or missing convergence conditions in the instruction."
            )
    if len(counts) > max_distinct_groups:
        raise RuntimeError(
            f"Action group proliferation: {len(counts)} distinct action groups invoked "
            f"in a single call (limit {max_distinct_groups}): {list(counts.keys())}. "
            f"Consider splitting into focused sub-agents with dedicated action groups."
        )

To use this, you need to collect trace events during the streaming response. Accumulate them in a list alongside the chunk events, then run check_action_group_thrash after the stream is exhausted.

Failure mode 4 — Supervisor-subagent cascade

Amazon Bedrock supports multi-agent collaboration where a supervisor agent (standard or inline) routes subtasks to specialized sub-agents. When inline agents are used as sub-agents in this architecture, a critical property changes: each inline agent call is stateless at the agent-configuration level. The sub-agent has no memory of being called before for the same task. If the supervisor's routing logic loops — it keeps delegating the same task to the same inline sub-agent because it never marks the task as complete — the inline sub-agent has no way to detect the repetition.

The failure pattern in multi-agent collaboration:

  1. Supervisor evaluates task X. Routes to inline agent A with instruction "Solve task X."
  2. Inline agent A produces a result. Returns to supervisor.
  3. Supervisor evaluates result. Determines it's incomplete (due to its own evaluation logic, not an error).
  4. Supervisor routes task X to inline agent A again — possibly with a slightly modified instruction.
  5. Inline agent A starts from scratch (no session history if sessionId varies per call). Produces a slightly different result.
  6. Supervisor loops. Each iteration: one supervisor model call + one inline agent invocation (N steps each).

The fix is at the supervisor layer: track task IDs and inline agent invocation counts. If the same logical task has been delegated more than a threshold number of times, the supervisor should escalate or mark the task as failed rather than delegating again.

class SupervisorCascadeBreaker:
    """
    Tracks how many times each task ID has been delegated to an inline agent.
    Trip when a task ID exceeds the delegation limit.
    """

    def __init__(self, max_delegations: int = 3):
        self.max_delegations = max_delegations
        self._delegation_counts: dict[str, int] = {}
        self._delegation_history: dict[str, list] = {}

    def check_and_record(
        self,
        task_id: str,
        agent_description: str,
        instruction_preview: str = "",
    ) -> None:
        """
        Call before each invoke_inline_agent call at the supervisor layer.
        task_id: a stable identifier for the logical task (not the sessionId).
        Raises RuntimeError if task_id has been delegated too many times.
        """
        count = self._delegation_counts.get(task_id, 0)
        if count >= self.max_delegations:
            history = self._delegation_history.get(task_id, [])
            raise RuntimeError(
                f"Supervisor cascade: task '{task_id}' has been delegated "
                f"{count} times (limit {self.max_delegations}). Last {len(history)} "
                f"agent calls did not resolve it. Mark the task as failed or "
                f"escalate to a human checkpoint rather than delegating again.\n"
                f"Delegation history: {history}"
            )
        self._delegation_counts[task_id] = count + 1
        self._delegation_history.setdefault(task_id, []).append({
            "delegation": count + 1,
            "agent": agent_description,
            "instruction_preview": instruction_preview[:120],
        })

    def reset_task(self, task_id: str) -> None:
        """Call when a task completes successfully to clear its count."""
        self._delegation_counts.pop(task_id, None)
        self._delegation_history.pop(task_id, None)

    def get_task_state(self, task_id: str) -> dict:
        return {
            "task_id": task_id,
            "delegation_count": self._delegation_counts.get(task_id, 0),
            "history": self._delegation_history.get(task_id, []),
        }

The task_id is a key your supervisor maintains — it's a stable identifier for the logical work item being delegated, not the sessionId of any particular inline agent call. If your supervisor is routing based on a ticket ID, query hash, or step name in a workflow, use that as the task_id.

The InlineAgentBreaker: combining all four guards

In production you want a single wrapper that manages all four failure modes together. The InlineAgentBreaker class below composes the individual guards and exposes a invoke method that replaces direct calls to client.invoke_inline_agent:

import boto3
import uuid
import json
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class InlineAgentConfig:
    """Runtime configuration for the InlineAgentBreaker."""
    foundation_model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"
    region: str = "us-east-1"

    # Instruction fingerprinting (failure mode 1)
    instruction_window: int = 5
    instruction_threshold: float = 0.80

    # Session accumulation guard (failure mode 2)
    base_tokens_per_call: int = 3000
    tokens_per_exchange: int = 1200
    max_context_tokens: int = 100_000

    # Action group thrash (failure mode 3)
    max_action_group_invocations: int = 3
    max_distinct_action_groups: int = 4

    # Budget cap
    max_invocations_per_run: int = 10
    price_per_input_token: float = 3.0 / 1_000_000


class InlineAgentBreaker:
    """
    Circuit breaker wrapper for boto3 invoke_inline_agent.
    Manages all four cost failure modes for Amazon Bedrock Inline Agents:
      1. Dynamic instruction loops
      2. Session context accumulation
      3. Action group thrash
      4. Supervisor cascade (use SupervisorCascadeBreaker at the caller layer)

    Usage:
        breaker = InlineAgentBreaker(config)
        result = breaker.invoke(
            session_id=session_id,
            instruction=instruction,
            input_text=query,
            action_groups=action_groups,
        )
    """

    def __init__(self, config: InlineAgentConfig = None):
        self.config = config or InlineAgentConfig()
        self._client = boto3.client(
            "bedrock-agent-runtime",
            region_name=self.config.region,
        )
        self._instruction_fingerprinter = InstructionFingerprinter(
            window=self.config.instruction_window,
            threshold=self.config.instruction_threshold,
        )
        self._session_guard = SessionAccumulationGuard(
            base_tokens_per_call=self.config.base_tokens_per_call,
            tokens_per_exchange=self.config.tokens_per_exchange,
            max_context_tokens=self.config.max_context_tokens,
            price_per_input_token=self.config.price_per_input_token,
        )
        self._invocation_count = 0

    def invoke(
        self,
        instruction: str,
        input_text: str,
        action_groups: list = None,
        knowledge_bases: list = None,
        session_id: str = None,
        end_session_on_end: bool = False,
        max_length: int = 10,
    ) -> dict:
        """
        Invoke an inline agent with circuit breaking on all four failure modes.

        Returns:
            {
                "text": str,               # final agent response
                "trace_events": list,      # all trace events (if enableTrace=True)
                "invocation_count": int,   # total calls made by this breaker instance
                "action_group_counts": dict,  # {action_group_name: invocation_count}
            }

        Raises RuntimeError on any guard trip with a descriptive message.
        """
        # Per-run invocation cap
        self._invocation_count += 1
        if self._invocation_count > self.config.max_invocations_per_run:
            raise RuntimeError(
                f"InlineAgentBreaker: invocation cap reached — "
                f"{self._invocation_count} calls against limit "
                f"{self.config.max_invocations_per_run}. "
                f"This run is attempting too many inline agent calls. "
                f"Check the orchestrator for unbounded delegation loops."
            )

        # Guard 1: instruction fingerprinting
        self._instruction_fingerprinter.check(instruction)

        # Guard 2: session accumulation (use fresh session_id for non-conversational pipelines)
        effective_session_id = session_id or str(uuid.uuid4())
        self._session_guard.check(effective_session_id)

        # Build API call
        kwargs = {
            "sessionId": effective_session_id,
            "foundationModel": self.config.foundation_model,
            "instruction": instruction,
            "inputText": input_text,
            "enableTrace": True,  # required for guard 3 (action group thrash detection)
            "endSessionOnEnd": end_session_on_end,
        }
        if action_groups:
            kwargs["actionGroups"] = action_groups
        if knowledge_bases:
            kwargs["knowledgeBases"] = knowledge_bases

        # Invoke
        response = self._client.invoke_inline_agent(**kwargs)

        # Consume stream
        text_parts = []
        trace_events = []
        for event in response["completion"]:
            if "chunk" in event:
                text_parts.append(event["chunk"]["bytes"].decode("utf-8"))
            elif "trace" in event:
                trace_events.append(event)
            elif "internalServerException" in event:
                raise RuntimeError(
                    f"Bedrock internal error: "
                    f"{event['internalServerException'].get('message', 'unknown')}"
                )
            elif "throttlingException" in event:
                raise RuntimeError(
                    f"Bedrock throttling: "
                    f"{event['throttlingException'].get('message', 'unknown')}"
                )
            elif "dependencyFailedException" in event:
                raise RuntimeError(
                    f"Bedrock dependency failure: "
                    f"{event['dependencyFailedException'].get('message', 'unknown')}"
                )

        # Guard 3: action group thrash — checked after stream completes
        ag_counts = count_action_group_invocations(trace_events)
        check_action_group_thrash(
            trace_events,
            max_per_group=self.config.max_action_group_invocations,
            max_distinct_groups=self.config.max_distinct_action_groups,
        )

        # Record successful call
        self._session_guard.record(effective_session_id)
        self._instruction_fingerprinter._history  # history already updated in check()

        return {
            "text": "".join(text_parts),
            "trace_events": trace_events,
            "invocation_count": self._invocation_count,
            "action_group_counts": ag_counts,
            "session_id": effective_session_id,
        }

    def reset_instruction_history(self) -> None:
        """Call when starting a genuinely new task where instruction variation is expected."""
        self._instruction_fingerprinter.reset()

    def reset_invocation_count(self) -> None:
        """Call between independent runs to reset the per-run invocation cap."""
        self._invocation_count = 0

A complete usage example with supervisor cascade detection:

import uuid

# Initialize breakers — one InlineAgentBreaker per agent type in your
# multi-agent collaboration architecture, one SupervisorCascadeBreaker
# for the orchestrator.
data_agent = InlineAgentBreaker(InlineAgentConfig(
    foundation_model="anthropic.claude-3-5-sonnet-20241022-v2:0",
    max_invocations_per_run=8,
))
supervisor_breaker = SupervisorCascadeBreaker(max_delegations=3)

def run_analysis_task(task_id: str, query: str) -> str:
    instruction = generate_analysis_instruction(query)  # LLM-generated

    # Check supervisor cascade before delegating
    supervisor_breaker.check_and_record(
        task_id=task_id,
        agent_description="DataAnalysisInlineAgent",
        instruction_preview=instruction[:120],
    )

    result = data_agent.invoke(
        instruction=instruction,
        input_text=query,
        action_groups=DATA_ACTION_GROUPS,
        session_id=str(uuid.uuid4()),  # fresh session per task — no accumulation
    )

    if is_result_satisfactory(result["text"]):
        supervisor_breaker.reset_task(task_id)  # clear on success
        return result["text"]
    else:
        # Will raise RuntimeError on 4th attempt
        return run_analysis_task(task_id, query)

Choosing the right maxLength for inline agents

Bedrock Inline Agents support a maxLength parameter (passed inside orchestrationConfiguration if using the inlineSessionState parameter, or via bedrockModelConfigurations) that caps the number of internal orchestration steps per invocation. The default is AWS-managed and can be surprisingly high for complex action group setups.

The relationship between maxLength and cost:

  • Too high: A stuck agent runs until it hits the ceiling, billing for every step. At Claude 3.5 Sonnet pricing (~$0.003/1k input tokens), a 20-step invocation with 5k-token context per step costs ~$0.30 — before Lambda invocations.
  • Too low: Complex tasks that legitimately need 8-10 steps are cut off early and return incomplete results, causing your supervisor to retry.
  • The right ceiling: Profile your typical task completion step counts across 20-30 successful runs and set maxLength to 2× the 90th percentile. This leaves headroom for genuine complexity while capping runaway cases at a known cost ceiling.

maxLength is a necessary but insufficient guard. It stops runaway orchestration within a single invocation. It does nothing to detect the cross-invocation loops (modes 1 and 4) or the accumulating costs within the call limit (modes 2 and 3). Use maxLength as a hard ceiling, and the guards above as the early-detection layer.

Pricing reference for inline agent foundation models

Inline agents bill at the same per-token rates as the underlying foundation model. Each internal orchestration step is one model invocation.

Model Input (per 1M tokens) Output (per 1M tokens) 10-step invocation at 4k-token context (est.)
anthropic.claude-3-5-sonnet-20241022-v2:0 $3.00 $15.00 ~$0.14
anthropic.claude-3-haiku-20240307-v1:0 $0.25 $1.25 ~$0.012
amazon.nova-pro-v1:0 $0.80 $3.20 ~$0.038
amazon.nova-lite-v1:0 $0.06 $0.24 ~$0.0028
amazon.nova-micro-v1:0 $0.035 $0.14 ~$0.0016

The "10-step invocation" estimate assumes 4,000 input tokens per step (instruction + accumulated tool results + prior reasoning) and 300 output tokens per step (tool call or response text). A dynamic instruction loop with 6 retries of 10-step Claude 3.5 Sonnet invocations costs ~$0.84 before any Lambda invocations — from a single logical task that should have taken one call.

RunGuard managed API for inline agent fleets

The guards above require you to manage state per-breaker instance: instruction history, session call counts, invocation counters. For a fleet of inline agents running across multiple Lambda functions or ECS tasks, that state lives in separate process memories. When your Lambda cold-starts, the instruction fingerprinting history resets. When you have 10 concurrent ECS tasks running inline agents, each has its own independent breaker with no shared view of the cross-task invocation rate.

RunGuard's managed API centralizes the circuit-breaking state server-side. Each /check call carries your run_id, the instruction hash, and the session context token estimate. RunGuard aggregates across all your calling processes and trips the breaker fleet-wide when the pattern crosses threshold:

import httpx
import hashlib

RUNGUARD_API_KEY = "rg_..."
RUNGUARD_BASE = "https://api.runguard.dev/v1"

def runguard_check_inline(
    run_id: str,
    instruction: str,
    session_id: str,
    session_call_count: int,
    estimated_context_tokens: int,
) -> dict:
    """Check guards before invoke_inline_agent. Returns {allowed: bool, reason: str}"""
    resp = httpx.post(
        f"{RUNGUARD_BASE}/check",
        json={
            "run_id": run_id,
            "tool_name": "invoke_inline_agent",
            "tool_input": {
                "instruction_hash": hashlib.sha256(instruction.encode()).hexdigest()[:16],
                "session_id": session_id,
                "session_call_count": session_call_count,
            },
            "context_tokens": estimated_context_tokens,
            "spent_usd": 0.0,
        },
        headers={"Authorization": f"Bearer {RUNGUARD_API_KEY}"},
        timeout=2.0,
    )
    resp.raise_for_status()
    return resp.json()

The Solo plan at $19/month covers one app with 1M guarded invocations per month — less than the cost of a single runaway inline agent supervisor loop that runs 20 iterations of Claude 3.5 Sonnet before you notice the billing alert.

Frequently asked questions

How does maxLength differ between standard Bedrock Agents and inline agents?

Standard Bedrock Agents expose maxLength as a fixed property on the agent's orchestration configuration, set during agent creation. It applies uniformly to every invocation of that agent alias. For inline agents, the equivalent parameter is set per invocation via inlineSessionState.conversationHistory or the orchestrationConfiguration.queryTransformationConfiguration in the API call body — it's dynamic, just like everything else about inline agents. In practice, many teams don't set it explicitly and accept the AWS-managed default, which can be up to 20 orchestration steps per invocation. Set it explicitly based on your profiled step-count distribution as described above.

Can I safely reuse the same sessionId across automated pipeline calls?

Only if you want the inline agent to have memory of the prior exchange. For automated pipelines where each call is a discrete, independent task (not a multi-turn conversation with a human user), generate a fresh sessionId per call using str(uuid.uuid4()). This eliminates the session accumulation failure mode entirely. The cost of generating a UUID is zero; the cost of a 10-call automated pipeline with shared session context at Claude 3.5 Sonnet rates can reach $1.50+ from context overhead alone.

Does enableTrace=True increase my billing?

No — the trace is generated by the orchestration model as part of its reasoning process and is billed as part of the model's output tokens regardless of whether you receive trace events in the stream. Setting enableTrace=False removes trace events from the response but does not reduce the underlying model calls or their token counts. You lose observability without saving cost. Keep enableTrace=True for the action group thrash detection pattern shown in this post — the trace events are the only way to count action group invocations from outside the agent.

How does the InlineAgentBreaker interact with Bedrock's multi-agent collaboration feature?

Multi-agent collaboration in Bedrock can wire inline agents as sub-agents through the collaboratorConfigurations parameter on the supervisor. When the supervisor is itself a managed Bedrock Agent (not an inline agent), the supervisor's orchestration calls your inline sub-agent via the Bedrock Agents runtime — you don't control those calls directly. In that case, the InlineAgentBreaker wrapping pattern described here doesn't apply. Instead, use the SupervisorCascadeBreaker at the workflow level where you have visibility into task delegation counts. For architectures where your application code is the supervisor (calling invoke_inline_agent directly), the InlineAgentBreaker applies as shown.

What's the difference between action group thrash (failure mode 3) and the action group invocation spiral in standard Bedrock Agents?

The standard Bedrock Agents action group spiral is when the agent repeatedly invokes the same action group with near-identical inputs — the agent loops on one tool rather than trying different tools. Action group thrash (failure mode 3 here) is when the agent rapidly cycles across multiple different action groups in a single invocation, trying each one in sequence without converging. The detection approach is different: spiral detection uses Jaccard similarity on input fingerprints; thrash detection counts distinct group names and per-group repetition counts from the trace. Both are possible in inline agents; inline agents are more exposed to thrash because the action group set is dynamically specified and may contain semantically overlapping groups that a well-configured standard agent would never combine.

Stop debugging runaway Bedrock Inline Agent loops after the bill lands

RunGuard wraps your invoke_inline_agent calls with hosted circuit-breaking logic — instruction fingerprinting, session tracking, action group thrash detection, and supervisor cascade limits all in one managed service. Two HTTP calls per invocation. Dashboard shows trips per app and cost-per-run across your fleet. Try the 14-day free trial with no card required.

Start free trial →