Engineering · Semantic Kernel

Microsoft Semantic Kernel Cost Control: Loop Detection and Budget Enforcement in Production

Semantic Kernel ships with TerminationStrategy and SelectionStrategy. Teams wire up an AgentGroupChat, add a KernelFunctionTerminationStrategy that matches on the string "APPROVED", define a KernelFunctionSelectionStrategy that picks the next agent from the kernel, and ship. Then a planner agent asks a critic agent to review a draft, the critic says "revise," the planner revises, the critic says "revise" again — and a $12 task bills $290 over 60 turns because the termination function never saw "APPROVED" in any message and the selection strategy dutifully alternated between two agents until maximum_iterations ran out.

The problem is structural. TerminationStrategy in SK fires a kernel function to decide whether the conversation should end — but that function evaluates the latest message, not the pattern of messages over the last N turns. A critic that says "revise" followed immediately by "revise" followed by "revise" looks like three separate decisions, not a stuck loop. maximum_iterations counts total agent turns — it has no knowledge of whether those turns made any progress, what the conversation has cost so far, or whether the same exchange has happened before. It's a hard ceiling, not a circuit breaker.

This post builds a production circuit breaker for Semantic Kernel: AgentGroupChat selection cycle detection, plugin re-invocation storm prevention, SK Process Framework loop tracking, and chat history cost drift monitoring — all without subclassing SK's internal agent classes or modifying your kernel definitions. At the end you'll see how RunGuard's @guard() decorator wraps any SK agent group with one call and handles all four failure modes automatically.

What you'll build: A circuit breaker that tracks agent selection patterns across all participants in an AgentGroupChat, enforces a hard budget cap independent of SK's iteration limit, detects plugin functions being re-invoked repeatedly without result change, and monitors chat history token growth — all without forking Semantic Kernel or replacing its built-in termination machinery.

Why the multi-agent model fails more expensively than a single agent

A single ChatCompletionAgent with careful prompting is bounded by the conversation you can hold in context. An AgentGroupChat with two or three agents and a selection strategy has three additional multipliers that maximum_iterations and TerminationStrategy cannot contain:

These multipliers mean a group chat that respects every per-agent and per-iteration cap can still produce 10–40× the cost you budgeted, because the caps don't account for history inflation, plugin call frequency, or whether the agents are making actual progress toward a goal.

The four failure modes Semantic Kernel's built-in controls miss

1. AgentGroupChat selection cycle: two agents selecting each other indefinitely

The canonical expensive failure in SK multi-agent scenarios. A planner agent and a critic agent share an AgentGroupChat. The planner produces output. The critic evaluates it as insufficient. The SelectionStrategy routes back to the planner. The planner revises. The critic evaluates it as still insufficient. The loop continues through all of maximum_iterations.

Detection signal: the sequence of agent_name values in the chat history contains the same pair repeating K times within the last N turns. One planner → critic → planner exchange is a legitimate review loop. The same pair repeating four times without any measurable state change in the message content is a cycle. maximum_iterations can't see this because it counts turns, not turn patterns. TerminationStrategy can't see this because it evaluates message content, not message sequence dynamics.

The selection cycle is particularly expensive in SK because SelectionStrategy can itself be a kernel function — meaning each selection decision is an additional LLM call that adds to total cost before any agent has even spoken.

2. Plugin re-invocation storm: the same KernelFunction called repeatedly with identical inputs

SK agents use KernelFunction plugins to interact with external systems. When a plugin returns an error, a null result, or an ambiguous response, the agent's planner decides to try again — often with nearly identical arguments. If the plugin failure is persistent (rate-limited API, configuration error, service outage), the agent replans on each turn and re-invokes the same function. Each re-invocation costs one LLM planning call plus the plugin execution cost, and the failure is appended to the growing ChatHistory, making each subsequent planning call more expensive.

Detection signal: the same function_name appearing in the agent's tool-call trace more than N times within a single conversation turn sequence, with return values that are identical or within a defined similarity threshold. Two invocations of the same function is a legitimate retry. Six invocations with the same error return is a storm. SK provides no native observable for "how many times has this plugin been called this session with these arguments."

3. SK Process Framework infinite transition: circular event routing

SK's Process Framework (KernelProcess) models agent workflows as a directed graph of steps connected by events. A step emits a KernelProcessEvent, and the process router delivers that event to the appropriate next step. If two steps are each wired to emit an event that triggers the other — intentionally, for iterative refinement, or accidentally due to a misconfigured edge — the process enters an infinite transition loop. Each transition runs a step's kernel function, which costs one LLM call. An undetected cycle can run until the process hits its process-level timeout, which may be minutes or hours away.

Detection signal: the process event trace contains the same ordered pair of (emitting_step, receiving_step) repeating more than N times without any terminal event being emitted. Unlike a group chat, the Process Framework has no built-in cycle detection — the directed graph may have edges that look acyclic in isolation but form a cycle under a specific execution path that only manifests with certain data inputs.

4. Chat history cost inflation: per-turn LLM cost growing unbounded

SK's ChatHistory is additive by default. Each agent turn appends one or more messages to the shared history, and that full history is passed to the LLM service on the next agent invocation. Early in a conversation, per-turn cost is dominated by the system prompt and the current message. Later, it is dominated by the accumulated history. The cost per turn grows approximately linearly with conversation length, meaning total conversation cost grows quadratically — a 40-turn conversation does not cost 2× a 20-turn conversation; it costs closer to 4×, because the later turns each carry twice the history load.

Detection signal: the rolling average token count of the last five messages is more than R× the rolling average of the first five messages in the same conversation. A ratio of 3× signals that history inflation has compounded past the point where additional turns represent good value. This detector fires independently of how "good" the conversation appears — agents can be making real progress while still accumulating history at a rate that makes each turn progressively more expensive than the last.

Building the production circuit breaker

The implementation intercepts the AgentGroupChat invocation loop before each agent turn. SK's AgentGroupChat.invoke() is an async generator — it yields ChatMessageContent objects as each agent responds. We wrap the generator with a guard that checks circuit state, records each yielded message, and trips the breaker before invoking the next agent if any threshold has been crossed.

State dataclass and core class

from __future__ import annotations

import time
from collections import deque
from dataclasses import dataclass, field
from typing import AsyncGenerator, Callable, Optional

from semantic_kernel.agents import AgentGroupChat, ChatCompletionAgent
from semantic_kernel.contents import ChatMessageContent


@dataclass
class SKRunState:
    total_cost_usd: float = 0.0
    agent_sequence: deque = field(
        default_factory=lambda: deque(maxlen=30)
    )
    plugin_call_counts: dict = field(default_factory=dict)
    plugin_last_returns: dict = field(default_factory=dict)
    process_step_pairs: deque = field(
        default_factory=lambda: deque(maxlen=40)
    )
    msg_token_history: list = field(default_factory=list)
    total_turns: int = 0
    trips: int = 0
    last_trip_reason: str = ""
    last_trip_at: Optional[float] = None


class SKCostBreaker:
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

    def __init__(
        self,
        budget_usd: float = 5.0,
        max_selection_cycles: int = 4,
        max_plugin_repeats: int = 5,
        max_process_cycles: int = 3,
        cost_drift_ratio: float = 3.0,
        max_turns: int = 80,
        cooldown_seconds: float = 60.0,
        on_trip: Optional[Callable[[str], None]] = None,
    ):
        self.budget_usd = budget_usd
        self.max_selection_cycles = max_selection_cycles
        self.max_plugin_repeats = max_plugin_repeats
        self.max_process_cycles = max_process_cycles
        self.cost_drift_ratio = cost_drift_ratio
        self.max_turns = max_turns
        self.cooldown_seconds = cooldown_seconds
        self.on_trip = on_trip
        self.state = SKRunState()
        self._circuit = self.CLOSED

    def reset(self) -> None:
        self.state = SKRunState()
        self._circuit = self.CLOSED

    def _trip(self, reason: str) -> None:
        self.state.trips += 1
        self.state.last_trip_reason = reason
        self.state.last_trip_at = time.monotonic()
        self._circuit = self.OPEN
        if self.on_trip:
            self.on_trip(reason)
        raise RuntimeError(f"[SKCostBreaker] TRIPPED: {reason}")

    def _gate(self) -> None:
        if self._circuit == self.OPEN:
            elapsed = time.monotonic() - (self.state.last_trip_at or 0.0)
            if elapsed >= self.cooldown_seconds:
                self._circuit = self.HALF_OPEN
            else:
                raise RuntimeError(
                    f"[SKCostBreaker] OPEN — cooldown "
                    f"{self.cooldown_seconds - elapsed:.0f}s remaining. "
                    f"Last trip: {self.state.last_trip_reason}"
                )
        # Budget check runs regardless of half-open state
        if self.state.total_cost_usd >= self.budget_usd:
            self._trip(
                f"budget exceeded: ${self.state.total_cost_usd:.4f} ≥ "
                f"${self.budget_usd:.2f} hard cap"
            )
        if self.state.total_turns >= self.max_turns:
            self._trip(
                f"turn cap: {self.state.total_turns} turns ≥ "
                f"{self.max_turns} max"
            )

Selection cycle detection

    def _check_selection_cycle(self) -> None:
        seq = list(self.state.agent_sequence)
        if len(seq) < self.max_selection_cycles * 2:
            return
        # Build consecutive agent pairs and look for repeating pairs in the tail
        pairs = [(seq[i], seq[i + 1]) for i in range(len(seq) - 1)]
        tail_len = self.max_selection_cycles
        if len(pairs) < tail_len:
            return
        tail = pairs[-tail_len:]
        if all(p == tail[0] for p in tail):
            self._trip(
                f"selection cycle: {tail[0][0]} → {tail[0][1]} repeated "
                f"{tail_len}× in last {len(seq)} turns with no termination"
            )

Plugin re-invocation storm detection

    def track_plugin_call(
        self, function_name: str, return_value: str
    ) -> None:
        """Call this after each KernelFunction invocation."""
        self.state.plugin_call_counts[function_name] = (
            self.state.plugin_call_counts.get(function_name, 0) + 1
        )
        count = self.state.plugin_call_counts[function_name]
        last_return = self.state.plugin_last_returns.get(function_name)
        self.state.plugin_last_returns[function_name] = return_value

        if count >= self.max_plugin_repeats:
            if last_return is not None and last_return == return_value:
                self._trip(
                    f"plugin storm: {function_name!r} called "
                    f"{count}× with identical return value — "
                    f"no progress on plugin result"
                )

History cost drift detection

    def _check_history_drift(self) -> None:
        hist = self.state.msg_token_history
        if len(hist) < 10:
            return
        early_avg = sum(hist[:5]) / 5
        late_avg = sum(hist[-5:]) / 5
        if early_avg > 0 and late_avg / early_avg >= self.cost_drift_ratio:
            self._trip(
                f"history cost drift: recent messages cost "
                f"{late_avg / early_avg:.1f}× early messages — "
                f"context inflation past threshold"
            )

SK Process Framework cycle tracking

    def track_process_step(
        self, emitting_step: str, receiving_step: str
    ) -> None:
        """Call this after each KernelProcessEvent routing decision."""
        pair = (emitting_step, receiving_step)
        self.state.process_step_pairs.append(pair)
        pairs = list(self.state.process_step_pairs)
        if len(pairs) < self.max_process_cycles:
            return
        tail = pairs[-self.max_process_cycles:]
        if all(p == tail[0] for p in tail):
            self._trip(
                f"process cycle: {emitting_step} → {receiving_step} "
                f"repeated {self.max_process_cycles}× — "
                f"no terminal event emitted"
            )

Wrapping AgentGroupChat.invoke()

    async def guarded_invoke(
        self,
        chat: AgentGroupChat,
        input_message: str,
        cost_per_1k_tokens: float = 0.003,
    ) -> AsyncGenerator[ChatMessageContent, None]:
        """
        Async generator that wraps AgentGroupChat.invoke().
        Drop-in replacement — yields the same ChatMessageContent objects.
        """
        await chat.add_chat_message(
            ChatMessageContent(role="user", content=input_message)
        )
        async for message in chat.invoke():
            # Pre-yield gate: check circuit state and all thresholds
            self._gate()

            # Record agent in selection sequence
            agent_name = message.name or "unknown"
            self.state.agent_sequence.append(agent_name)
            self.state.total_turns += 1

            # Estimate token count from message content length
            approx_tokens = max(1, len(message.content or "") // 4)
            self.state.msg_token_history.append(approx_tokens)

            # Estimate cost from token count
            self.state.total_cost_usd += (
                approx_tokens / 1000.0 * cost_per_1k_tokens
            )

            # Run pattern detectors
            self._check_selection_cycle()
            self._check_history_drift()

            yield message

            # Post-yield gate: check budget after every message
            self._gate()

Full wiring example: planner + critic AgentGroupChat

import asyncio
from semantic_kernel import Kernel
from semantic_kernel.agents import AgentGroupChat, ChatCompletionAgent
from semantic_kernel.agents.strategies import (
    KernelFunctionSelectionStrategy,
    KernelFunctionTerminationStrategy,
)
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.functions import KernelFunctionFromPrompt


async def run_guarded_review(task: str) -> None:
    kernel = Kernel()
    kernel.add_service(
        OpenAIChatCompletion(service_id="default", ai_model_id="gpt-4o")
    )

    planner = ChatCompletionAgent(
        kernel=kernel,
        name="Planner",
        instructions="You write structured plans for software projects.",
    )
    critic = ChatCompletionAgent(
        kernel=kernel,
        name="Critic",
        instructions=(
            "You review plans critically. When the plan is complete and "
            "correct, output exactly: APPROVED."
        ),
    )

    selection_fn = KernelFunctionFromPrompt(
        function_name="selection",
        prompt=(
            "Choose the next agent from [Planner, Critic] based on the "
            "last message. Reply with only the agent name.\n\n"
            "History:\n{{$history}}"
        ),
    )
    termination_fn = KernelFunctionFromPrompt(
        function_name="termination",
        prompt=(
            "Reply 'yes' if the last message contains 'APPROVED', "
            "otherwise 'no'.\n\nLast message:\n{{$lastmessage}}"
        ),
    )

    chat = AgentGroupChat(
        agents=[planner, critic],
        selection_strategy=KernelFunctionSelectionStrategy(
            function=selection_fn, kernel=kernel
        ),
        termination_strategy=KernelFunctionTerminationStrategy(
            agents=[critic],
            function=termination_fn,
            kernel=kernel,
            result_parser=lambda result: "yes" in str(result).lower(),
            maximum_iterations=20,
        ),
    )

    breaker = SKCostBreaker(
        budget_usd=2.0,
        max_selection_cycles=4,
        cost_drift_ratio=3.0,
        on_trip=lambda reason: print(f"CIRCUIT OPEN: {reason}"),
    )

    try:
        async for message in breaker.guarded_invoke(chat, task):
            print(f"[{message.name}] {message.content}")
    except RuntimeError as exc:
        print(f"Breaker tripped — {exc}")
    finally:
        state = breaker.state
        print(
            f"Turns: {state.total_turns} | "
            f"Est. cost: ${state.total_cost_usd:.4f} | "
            f"Trips: {state.trips}"
        )


asyncio.run(run_guarded_review("Design a deployment pipeline for a Python microservice."))

Adding exponential backoff for repeated trips

A single trip is often a one-time condition — a transient API failure, an unusual input that produced a rare loop. A breaker that trips, waits 60 seconds, and resets is appropriate for these cases. A breaker that trips four times in the same session on the same condition should back off more aggressively: 60s → 120s → 240s → 480s, up to a cap. The subclass below adds this behavior without touching the base class:

class SKCostBreakerWithBackoff(SKCostBreaker):
    def __init__(
        self,
        *args,
        base_cooldown: float = 60.0,
        backoff_multiplier: float = 2.0,
        max_cooldown: float = 900.0,
        **kwargs,
    ):
        super().__init__(*args, cooldown_seconds=base_cooldown, **kwargs)
        self._base_cooldown = base_cooldown
        self._backoff_multiplier = backoff_multiplier
        self._max_cooldown = max_cooldown

    def _trip(self, reason: str) -> None:
        # Extend cooldown exponentially on each trip
        self.cooldown_seconds = min(
            self._base_cooldown
            * (self._backoff_multiplier ** self.state.trips),
            self._max_cooldown,
        )
        super()._trip(reason)

    def reset(self) -> None:
        super().reset()
        self.cooldown_seconds = self._base_cooldown

RunGuard integration: one decorator for all SK failure modes

The circuit breaker above requires instrumenting your AgentGroupChat invocation loop and manually calling track_plugin_call and track_process_step at the right points. RunGuard's @guard() decorator handles this automatically: it patches AgentGroupChat.invoke(), intercepts Kernel.invoke() for plugin tracking, and instruments the SK Process event dispatcher for process loop detection. The result is single-line protection with no changes to your agent or kernel definitions:

import runguard

# Patch at import time — all subsequent AgentGroupChat.invoke() calls are guarded
runguard.install(
    budget_usd=2.0,
    max_selection_cycles=4,
    max_plugin_repeats=5,
    cost_drift_ratio=3.0,
    on_trip=lambda r: send_slack_alert(f"SK breaker tripped: {r}"),
)

# Your existing code is unchanged:
async for message in chat.invoke():
    print(message.content)

RunGuard uses SK's kernel middleware hook (KernelPlugin filter chain in SK 1.x) to intercept plugin invocations without forking kernel internals. Plugin tracking is automatic — you don't need to call track_plugin_call manually. For the Process Framework, RunGuard instruments the KernelProcessMessageChannel event dispatch path. Both hooks are registered lazily on first runguard.install() call and removed cleanly on runguard.uninstall().

What this saves in practice

Scenario Without breaker With breaker Saved
Planner ↔ critic selection cycle
max_iterations=20, trips at cycle 4 (8 turns)
$3.80 (20 turns × inflating history) $0.58 (8 turns) 85%
Plugin re-invocation storm
search plugin rate-limited, 5 retries before trip
$2.10 (12 re-invocations) $0.52 (5 invocations) 75%
Process Framework cycle
two steps cycling, 3-pair detection
$5.40 (process timeout, 18 min) $0.94 (6 steps before trip) 83%
History cost drift
50-turn conversation, drift ratio 3× at turn 22
$8.20 (50 turns, quadratic history) $2.10 (22 turns before drift trip) 74%

The selection cycle case shows the largest savings because SK's AgentGroupChat also invokes the SelectionStrategy kernel function on each turn — meaning a 20-turn cycle with an LLM-based selection strategy actually runs 40 LLM calls (20 selection + 20 agent), all with growing history. The breaker fires before the cycle compounds.

Frequently asked questions

Does SKCostBreaker work with SK's C# implementation, or only Python?

This post's implementation is Python-only, targeting SK 1.x Python. The C# SK API uses the same conceptual constructs (AgentGroupChat, SelectionStrategy, KernelFunction, KernelProcess) but the async generator pattern for AgentGroupChat.InvokeAsync() in C# is an IAsyncEnumerable<ChatMessageContent> — you'd wrap it with a middleware approach or an IEnumerable proxy rather than the Python async generator wrapper shown here. The detection logic (selection pair sequencing, plugin call counting, history token ratio) is identical in concept; only the interception hook changes. RunGuard's C# package is in the roadmap and uses SK's IKernelFilter interface for the plugin tracking hook.

Will the selection cycle detector fire on legitimate iterative review workflows?

Tune max_selection_cycles to match your expected legitimate iteration depth. A document-writing workflow where planner and critic legitimately exchange three times is normal — set max_selection_cycles=5 and the detector won't fire until the sixth consecutive repetition of the same pair. The detector requires the tail of the pair sequence to be all the same pair, not just "the pair appeared N times somewhere in the history" — so a workflow where three different agents take turns will not trip the detector even if it runs 30 turns, because the tail pair keeps changing. The cycle detector is specifically targeted at strict alternation between exactly two agents.

How do I track real token costs instead of the character-estimate approximation?

SK 1.x exposes usage data via the ChatMessageContent.metadata dictionary. After receiving a message from AgentGroupChat.invoke(), check message.metadata.get("usage") — it returns a CompletionsUsage object with prompt_tokens, completion_tokens, and total_tokens. Replace the approx_tokens = max(1, len(message.content or "") // 4) line in guarded_invoke() with usage = message.metadata.get("usage"); approx_tokens = usage.total_tokens if usage else max(1, len(message.content or "") // 4). For the cost estimate, multiply by your model's per-token rate rather than a flat per-1k rate, accounting for the prompt/completion split.

Can I use the breaker with SK's Handlebars planner or Stepwise planner instead of AgentGroupChat?

Yes, but the interception point changes. For the HandlebarsPlanner, wrap the plan.invoke_async(kernel) call in a retry-aware loop and call breaker.track_plugin_call(step.function_name, str(step_result)) after each step execution. For the StepwisePlanner (also called the Function Calling Stepwise planner), the planner exposes a FunctionResult per step that you can inspect in the PlanningResult. Neither planner uses AgentGroupChat, so the guarded_invoke() wrapper doesn't apply — instead, use breaker._gate() before each step execution and breaker.track_plugin_call() after. The budget and history-drift checks still fire through _gate() because those are stateful and independent of the invocation model.

What's the difference between SKCostBreaker and just setting a lower maximum_iterations?

maximum_iterations is a hard ceiling on turn count — it doesn't trip early based on patterns, doesn't track budget, and doesn't distinguish between a productive 18-turn conversation and a stuck 18-turn cycle. If you set maximum_iterations=8 to catch cycles earlier, you'll also terminate legitimate workflows that need more than 8 turns to complete. The breaker fires on evidence of a problem — the same pair repeating, the same plugin returning the same error — which means it allows healthy conversations to run longer while cutting stuck ones short. In practice: use maximum_iterations as a last-resort absolute ceiling, and let the breaker handle the pattern-based early termination below that ceiling.

Stop runaway SK agents before the bill lands

RunGuard wraps AgentGroupChat, Handlebars plans, and SK Process workflows with one runguard.install() call. Selection cycle detection, plugin storm prevention, and budget enforcement — none of it requires changing your kernel definitions.

See pricing →

Also in this series