OpenAI Realtime API Cost Control: Loop Detection and Budget Enforcement for Voice Agents

The OpenAI Realtime API is architecturally different from every other API covered in this series. You're not sending HTTP requests and receiving JSON responses. You're opening a persistent WebSocket connection, streaming audio in and audio out, and responding to a stream of server-sent events. The billing model is different too: you pay for audio tokens — roughly $0.06/min for input and $0.24/min for output with gpt-4o-realtime-preview (verify current rates at openai.com/pricing) — not text tokens per completion.

That last word — "output" — conceals the most important cost fact about the Realtime API: partial audio is billed. If your agent begins generating a 10-second response and the user interrupts after 3 seconds, the server sends a response.cancel event and stops generating. But those 3 seconds of audio output that were already generated are billed. The audio was produced, streamed, and discarded — and you pay for it regardless.

For text-based agents, every token in a completion is either used or the whole request is abandoned at the network layer before cost accrues. For voice agents, cost accrues in real time, incrementally, from the moment the model starts speaking. This creates four failure modes that text agents don't have — and that the standard loop-detection patterns from OpenAI Agents SDK cost control won't catch.

This post covers all four, with Python asyncio implementations that plug into the Realtime API WebSocket event loop.

Scope. This post targets agents built directly on the OpenAI Realtime API WebSocket protocol — the wss://api.openai.com/v1/realtime endpoint using the openai Python SDK's AsyncRealtimeConnection or a raw websockets client. The failure modes apply equally to phone agents (Twilio + Realtime API) and browser-based voice assistants. For text-only agents using the standard OpenAI completions or Assistants API, see OpenAI Agents SDK Cost Control. For async Python patterns that apply across both, see Async Python AI Agent Cost Control.

How the Realtime API differs from text APIs for cost purposes

Before the failure modes, it helps to be precise about how the billing model and event model differ from text-based APIs. The gap is wider than most developers expect when first integrating the Realtime API.

Dimension Cloud text API (GPT-4o, Claude, Gemini) OpenAI Realtime API (gpt-4o-realtime-preview)
Cost unit Text tokens (input + output per completion) Audio tokens (input + output, billed per second of audio)
Partial billing No — a cancelled or failed request is not billed for output Yes — audio already generated before a response.cancel is billed in full
Context signal Explicit: HTTP 400 context_length_exceeded or token count in response Session-managed server-side; no per-call context signal; grows silently
Loop detection mechanism Fingerprint tool call arguments across consecutive turns Fingerprint tool calls AND monitor event rates (cancel/VAD events per minute)
Interruption model None — responses complete atomically User barge-in fires input_audio_buffer.speech_started; server truncates response
Session state Stateless per request; you control the messages list Stateful WebSocket session; server accumulates conversation history automatically

The partial billing and server-managed session state together mean that the standard "count tokens, enforce a budget, check for repeated tool calls" pattern from the cost control pattern reference is necessary but not sufficient. You need additional guards at the event level, not just the turn level.

Failure Mode 1: Barge-in amplification loop

When a user interrupts mid-response — because the agent is too slow, repeating itself, or the user simply has something new to say — the browser or phone client sends audio to the server while the response is playing. The server detects speech energy in the input stream, fires input_audio_buffer.speech_started, and cancels the current response with response.cancel. The partial audio already generated is billed.

So far, this is expected behavior. The amplification loop starts when the application code — handling the response.cancel event — immediately calls response.create to start a new response based on the user's new input. If the user is in a noisy environment (background TV, HVAC, open office), or if the VAD sensitivity is high, the pattern becomes:

  1. Agent starts speaking (response generates audio, cost accrues)
  2. Background noise triggers input_audio_buffer.speech_started
  3. response.cancel fires — 2 seconds of audio billed, discarded
  4. Application immediately calls response.create — new response starts
  5. Same background noise triggers input_audio_buffer.speech_started again
  6. Loop repeats at 10–20 cycles per minute

At $0.24/min for output audio, with 2-second partial responses cancelled at 10 cycles per minute, you're paying for ~20 seconds of audio output per minute — for zero delivered value. At scale across multiple concurrent sessions, this is invisible on a per-session basis and devastating on the aggregate bill.

Detection requires tracking response.cancel events in a rolling time window. A BargeinRateGuard raises BargeinLoopError when the cancel rate exceeds the threshold:

import asyncio
import time
from collections import deque
from dataclasses import dataclass, field


class BargeinLoopError(RuntimeError):
    pass


@dataclass
class BargeinRateGuard:
    """
    Detects barge-in amplification loops by tracking response.cancel
    event rate in a rolling time window.

    A high cancel rate indicates the application is immediately re-starting
    responses after cancellation without a debounce delay — causing rapid
    billing cycles with zero delivered audio value.
    """
    max_cancels_per_window: int = 5
    window_seconds: float = 60.0
    min_debounce_seconds: float = 1.5  # minimum wait before re-issuing response.create

    _cancel_times: deque = field(default_factory=deque, init=False)
    _last_cancel_time: float = field(default=0.0, init=False)

    def record_cancel(self) -> None:
        """Call whenever a response.cancel event is received from the server."""
        now = time.monotonic()
        self._last_cancel_time = now

        # Prune events outside the rolling window
        cutoff = now - self.window_seconds
        while self._cancel_times and self._cancel_times[0] < cutoff:
            self._cancel_times.popleft()

        self._cancel_times.append(now)

        if len(self._cancel_times) >= self.max_cancels_per_window:
            rate = len(self._cancel_times) / self.window_seconds * 60
            raise BargeinLoopError(
                f"Barge-in amplification loop detected: {len(self._cancel_times)} response "
                f"cancellations in the last {self.window_seconds:.0f}s "
                f"({rate:.1f} cancels/min). "
                "Apply a cooldown before calling response.create after a barge-in, "
                "or reduce server_vad sensitivity in the session configuration."
            )

    def check_debounce(self) -> None:
        """
        Call before issuing response.create after a barge-in cancellation.
        Raises BargeinLoopError if insufficient time has passed since the last cancel.
        """
        if self._last_cancel_time == 0.0:
            return
        elapsed = time.monotonic() - self._last_cancel_time
        if elapsed < self.min_debounce_seconds:
            raise BargeinLoopError(
                f"response.create called {elapsed:.2f}s after last barge-in cancel "
                f"(minimum debounce: {self.min_debounce_seconds}s). "
                "Wait for the user to finish speaking before restarting the response."
            )

    @property
    def cancel_count_in_window(self) -> int:
        now = time.monotonic()
        cutoff = now - self.window_seconds
        return sum(1 for t in self._cancel_times if t >= cutoff)


# Integration with an asyncio WebSocket event handler:
#
# guard = BargeinRateGuard(max_cancels_per_window=5, window_seconds=60.0)
#
# async def handle_event(event: dict) -> None:
#     event_type = event.get("type")
#     if event_type == "response.cancel":
#         guard.record_cancel()          # raises BargeinLoopError if threshold exceeded
#     elif event_type == "response.done":
#         # Debounce check: only restart if silence detected, not immediately
#         if should_continue_conversation(event):
#             guard.check_debounce()
#             await ws.send_json({"type": "response.create"})

The check_debounce method enforces a mandatory pause between a cancel event and the next response.create. A 1.5-second debounce is long enough to let genuine user speech finish before interpreting silence as "ready for response" — which is what the server VAD should already do, but the debounce protects against edge cases where the application code bypasses VAD state.

Failure Mode 2: Server VAD false-positive storm

When you configure turn_detection.type = "server_vad" in the session, the server automatically segments the user's speech into turns: it detects when audio energy exceeds a threshold (speech_started), then detects silence after speech ends (speech_stopped), and automatically triggers a response generation. This removes the need for push-to-talk UI, which is exactly why most production voice agents use it.

In noisy environments — HVAC systems, keyboard clicks, other speakers in an open office, a phone call on speakerphone — the VAD can fire on background audio. The server-side VAD is energy-based and operates on very short windows. A repeated noise at roughly speech frequencies (90–300 Hz, which covers many mechanical sounds) can trigger input_audio_buffer.speech_started followed immediately by input_audio_buffer.speech_stopped, causing the server to issue a response to audio that contains no speech.

The signature of a VAD storm is distinct from a barge-in loop: instead of a response generating and then being cancelled, you get very short "turns" — conversation.item.created events with role: "user" arriving within milliseconds to hundreds of milliseconds of each other, far shorter than any real human speech turn. Humans speak in turns of 1–8 seconds; VAD false positives arrive in bursts of 50–400ms.

A VADStormDetector measures the inter-turn interval and suppresses response generation when the interval is too short to be genuine speech:

import asyncio
import time
from collections import deque
from dataclasses import dataclass, field


class VADStormError(RuntimeError):
    pass


@dataclass
class VADStormDetector:
    """
    Detects server VAD false-positive storms by measuring inter-turn intervals.

    Real human speech turns are separated by at least 800ms of silence (the
    server VAD's speech_stopped detection window). Consecutive user turns
    arriving faster than this threshold indicate VAD misfires on background audio.
    """
    min_turn_interval_ms: float = 800.0   # below this = likely VAD false positive
    consecutive_short_turns_limit: int = 4  # trip after this many consecutive short turns
    measurement_window: int = 10           # track this many recent inter-turn intervals

    _last_turn_time: float = field(default=0.0, init=False)
    _recent_intervals_ms: deque = field(default_factory=deque, init=False)
    _consecutive_short: int = field(default=0, init=False)
    _suppression_active: bool = field(default=False, init=False)
    _suppression_until: float = field(default=0.0, init=False)

    def record_user_turn(self) -> bool:
        """
        Call when a conversation.item.created event with role='user' is received.
        Returns True if response generation should proceed, False if suppressed.
        Raises VADStormError if the storm threshold is exceeded.
        """
        now = time.monotonic()

        if self._last_turn_time > 0.0:
            interval_ms = (now - self._last_turn_time) * 1000.0

            # Maintain a rolling window of recent intervals
            self._recent_intervals_ms.append(interval_ms)
            if len(self._recent_intervals_ms) > self.measurement_window:
                self._recent_intervals_ms.popleft()

            if interval_ms < self.min_turn_interval_ms:
                self._consecutive_short += 1
            else:
                self._consecutive_short = 0  # reset on a legitimate-length turn

            if self._consecutive_short >= self.consecutive_short_turns_limit:
                short_intervals = [
                    f"{iv:.0f}ms" for iv in self._recent_intervals_ms
                    if iv < self.min_turn_interval_ms
                ]
                raise VADStormError(
                    f"Server VAD false-positive storm detected: "
                    f"{self._consecutive_short} consecutive user turns with inter-turn "
                    f"interval below {self.min_turn_interval_ms:.0f}ms. "
                    f"Recent short intervals: {', '.join(short_intervals[-5:])}. "
                    "Reduce vad.threshold in session configuration, or increase "
                    "silence_duration_ms to require longer silence before turn completion."
                )

        self._last_turn_time = now

        # If active suppression window hasn't expired, suppress response generation
        if self._suppression_active and time.monotonic() < self._suppression_until:
            return False  # tell the caller: do not call response.create
        self._suppression_active = False
        return True

    def activate_suppression(self, duration_seconds: float = 3.0) -> None:
        """
        Temporarily suppress response generation after a short-turn burst.
        Call from the exception handler of VADStormError before resetting the counter.
        """
        self._suppression_active = True
        self._suppression_until = time.monotonic() + duration_seconds
        self._consecutive_short = 0  # reset for the next measurement cycle

    @property
    def avg_recent_interval_ms(self) -> float:
        if not self._recent_intervals_ms:
            return float("inf")
        return sum(self._recent_intervals_ms) / len(self._recent_intervals_ms)


# Integration example:
#
# vad_guard = VADStormDetector(
#     min_turn_interval_ms=800.0,
#     consecutive_short_turns_limit=4,
# )
#
# async def handle_event(event: dict) -> None:
#     if event.get("type") == "conversation.item.created":
#         item = event.get("item", {})
#         if item.get("role") == "user":
#             try:
#                 should_respond = vad_guard.record_user_turn()
#             except VADStormError as exc:
#                 vad_guard.activate_suppression(duration_seconds=3.0)
#                 log.warning("VAD storm suppressed: %s", exc)
#                 return  # skip response.create during suppression
#             if should_respond:
#                 await ws.send_json({"type": "response.create"})

The suppression mechanism is the key addition over a simple threshold check. After a VAD storm is detected, calling activate_suppression() tells the detector to silently discard the next few turns — giving the actual noise source time to settle — before re-enabling response generation. Without this, the exception handler itself becomes a source of rapid on/off cycling.

Failure Mode 3: Function call echo chamber

The Realtime API supports server-side function calls that work differently from text-API tool use. The model emits a response.function_call_arguments.done event when it wants to call a tool. Your application executes the tool, sends the result back as a conversation.item.create event with type: "function_call_output", and then calls response.create to let the model continue. The model then generates an audio response acknowledging the tool result before deciding whether to call another tool.

That audio acknowledgement — "I've completed the file save" or "The search returned three results, let me summarize them" — is where the echo chamber begins in one specific class of agent implementations. In VAD-enabled sessions without careful stream management, the model's own audio output can be picked up by the microphone (in environments without proper acoustic echo cancellation), transcribed as user input, and re-submitted to the session as a new user turn. The turn contains the model's own words, which the model interprets as instructions to call the tool again.

Even without acoustic echo, the pattern appears in buggy agents that re-process the transcript of function call results as if they were new user messages. The loop: model calls tool → tool result submitted → model speaks confirmation → confirmation transcribed as user input → model re-issues the same tool call → loop repeats.

This is structurally distinct from the text-API tool call loop. In text agents, you detect repeated (function_name, arguments) pairs across consecutive assistant turns. In voice agents, the cycle includes an audio confirmation turn between each tool call — so the tool calls don't appear consecutive in the raw event stream. The fingerprint tracker needs to look across a wider window, ignoring interleaved audio turns:

import hashlib
import json
from collections import deque
from dataclasses import dataclass, field


class FunctionEchoError(RuntimeError):
    pass


@dataclass
class FunctionEchoDetector:
    """
    Detects function call echo chamber loops in Realtime API sessions.

    Tracks (function_name, argument_fingerprint) tuples across all tool calls
    in the session, ignoring interleaved audio turns. Raises FunctionEchoError
    when the same function+arguments pair appears in N consecutive tool calls
    within a sliding window.
    """
    max_consecutive_identical: int = 3
    window_size: int = 20  # track this many recent tool calls

    _call_history: deque = field(default_factory=deque, init=False)

    def record_function_call(self, function_name: str, arguments_json: str) -> None:
        """
        Call when a response.function_call_arguments.done event is received.
        arguments_json: the raw JSON string from the event's 'arguments' field.
        Raises FunctionEchoError if a repeated call pattern is detected.
        """
        try:
            args_dict = json.loads(arguments_json) if arguments_json.strip() else {}
        except json.JSONDecodeError:
            args_dict = {"_raw": arguments_json}

        fingerprint = self._fingerprint(function_name, args_dict)
        self._call_history.append((function_name, fingerprint))

        if len(self._call_history) > self.window_size:
            self._call_history.popleft()

        # Count how many of the most recent N calls share this fingerprint
        recent = list(self._call_history)
        # Scan backwards for consecutive identical fingerprints at the tail
        consecutive = 0
        for _, fp in reversed(recent):
            if fp == fingerprint:
                consecutive += 1
            else:
                break  # stop at the first non-matching call

        if consecutive >= self.max_consecutive_identical:
            raise FunctionEchoError(
                f"Function call echo chamber detected: '{function_name}' called "
                f"with identical arguments {consecutive}x in {len(recent)} recent calls. "
                f"Fingerprint: {fingerprint}. "
                "Common causes: (1) model audio output re-entering session as user input "
                "(check acoustic echo cancellation or mute model output channel), "
                "(2) function result not being correctly appended to conversation history, "
                "(3) agent re-processing transcript of previous function_call_output items."
            )

    def _fingerprint(self, function_name: str, arguments: dict) -> str:
        canonical = f"{function_name}:{json.dumps(arguments, sort_keys=True)}"
        return hashlib.sha256(canonical.encode()).hexdigest()[:16]

    @property
    def call_count(self) -> int:
        return len(self._call_history)

    def get_recent_calls(self, n: int = 5) -> list[tuple[str, str]]:
        """Return the n most recent (function_name, fingerprint) pairs."""
        return list(self._call_history)[-n:]


# Integration example:
#
# echo_guard = FunctionEchoDetector(max_consecutive_identical=3)
#
# async def handle_event(event: dict) -> None:
#     if event.get("type") == "response.function_call_arguments.done":
#         function_name = event.get("name", "")
#         arguments = event.get("arguments", "{}")
#         echo_guard.record_function_call(function_name, arguments)
#         # Proceeds to tool execution only if no exception raised
#         result = await execute_tool(function_name, arguments)
#         await ws.send_json({
#             "type": "conversation.item.create",
#             "item": {
#                 "type": "function_call_output",
#                 "call_id": event.get("call_id"),
#                 "output": json.dumps(result),
#             }
#         })
#         await ws.send_json({"type": "response.create"})

The "scan backwards for consecutive identical fingerprints" approach correctly handles the interleaved audio confirmation pattern: even if there are two or three non-tool-call events between identical tool calls, the consecutive count resets to 1 when a different tool call appears, so the guard won't falsely trip on a legitimate sequence of calls to the same function with different arguments.

Failure Mode 4: Session transcript accumulation

Unlike the text APIs — where you send the full conversation history on every request and explicitly control what's included — the Realtime API maintains conversation history server-side in the WebSocket session. Every turn (user audio, function call, function result, assistant audio + transcript) is automatically appended to the session's conversation. You don't send it; the session accumulates it.

This is convenient for short conversations but creates a compounding cost problem for long-running voice agents: kiosks, phone support lines, interactive voice response systems, or any session that stays open for many minutes. After 30 minutes of moderate conversation, the session history is substantial. Every new response generation pays for that entire accumulated context as input — and since input audio/text tokens are billed at $0.06/min equivalent, a dense 30-minute history might cost $1.80 in input tokens for a single new response.

The correct mitigation is periodic session rotation: close the current WebSocket session, open a new one, and carry forward only the minimal context needed for continuity (current task summary, user preferences). But most agent implementations don't do this — the WebSocket stays open indefinitely, accumulating context without any awareness of the growing cost baseline.

A SessionAgeGuard monitors session duration and turn count, alerting before the accumulated context reaches the point where session rotation becomes urgent:

import asyncio
import time
from dataclasses import dataclass, field


class SessionAgeError(RuntimeError):
    pass


@dataclass
class SessionAgeGuard:
    """
    Monitors Realtime API session age and conversation turn count.

    Long-running sessions accumulate conversation history server-side,
    increasing the input token cost of every new response. This guard
    raises SessionAgeError when the session should be rotated to prevent
    unbounded context accumulation cost.
    """
    max_session_minutes: float = 20.0   # rotate after 20 minutes
    max_turn_count: int = 50            # rotate after 50 conversation turns
    warn_at_fraction: float = 0.80      # warn at 80% of each limit

    _session_start_time: float = field(default_factory=time.monotonic, init=False)
    _session_id: str = field(default="", init=False)
    _turn_count: int = field(default=0, init=False)
    _warned_age: bool = field(default=False, init=False)
    _warned_turns: bool = field(default=False, init=False)

    def record_session_created(self, session_id: str) -> None:
        """Call when a session.created event is received from the server."""
        self._session_id = session_id
        self._session_start_time = time.monotonic()
        self._turn_count = 0
        self._warned_age = False
        self._warned_turns = False

    def record_turn_completed(self) -> None:
        """
        Call when a response.done event is received, indicating a full
        assistant turn has completed (audio generation finished).
        Raises SessionAgeError if rotation thresholds are exceeded.
        """
        self._turn_count += 1
        self._check_thresholds()

    def check_now(self) -> None:
        """
        Explicit check — call before any response.create to detect
        if the session has aged past the rotation threshold.
        """
        self._check_thresholds()

    def _check_thresholds(self) -> None:
        elapsed_minutes = (time.monotonic() - self._session_start_time) / 60.0

        # Hard limits
        if elapsed_minutes >= self.max_session_minutes:
            raise SessionAgeError(
                f"Session '{self._session_id}' has been open for "
                f"{elapsed_minutes:.1f} minutes (limit: {self.max_session_minutes:.0f} min). "
                f"Accumulated {self._turn_count} conversation turns. "
                "Rotate the session: close this WebSocket, open a new session.create, "
                "and resume with a summarized context to prevent compounding input token cost."
            )

        if self._turn_count >= self.max_turn_count:
            raise SessionAgeError(
                f"Session '{self._session_id}' has accumulated {self._turn_count} turns "
                f"(limit: {self.max_turn_count}). "
                f"Session age: {elapsed_minutes:.1f} minutes. "
                "Rotate the session to reset the server-side conversation history "
                "and control the input token baseline for future responses."
            )

        # Soft warnings (logged, not raised)
        age_fraction = elapsed_minutes / self.max_session_minutes
        turn_fraction = self._turn_count / self.max_turn_count

        if age_fraction >= self.warn_at_fraction and not self._warned_age:
            self._warned_age = True
            # Return a warning signal rather than raising — callers can log/alert
            raise SessionAgeWarning(
                f"Session '{self._session_id}' at {age_fraction*100:.0f}% of age limit "
                f"({elapsed_minutes:.1f}/{self.max_session_minutes:.0f} min). "
                "Consider preparing a session rotation at a natural conversation break."
            )

        if turn_fraction >= self.warn_at_fraction and not self._warned_turns:
            self._warned_turns = True
            raise SessionAgeWarning(
                f"Session '{self._session_id}' at {turn_fraction*100:.0f}% of turn limit "
                f"({self._turn_count}/{self.max_turn_count} turns). "
                "Plan session rotation before the next natural pause."
            )

    @property
    def session_age_minutes(self) -> float:
        return (time.monotonic() - self._session_start_time) / 60.0

    @property
    def turn_count(self) -> int:
        return self._turn_count


class SessionAgeWarning(RuntimeError):
    """Non-fatal warning: session is approaching rotation thresholds."""
    pass


# Integration example showing session rotation:
#
# age_guard = SessionAgeGuard(max_session_minutes=20.0, max_turn_count=50)
#
# async def rotate_session(ws, summary: str) -> None:
#     """Close old session and open a new one with summarized context."""
#     await ws.close()
#     new_ws = await open_realtime_connection()
#     await new_ws.send_json({
#         "type": "session.update",
#         "session": {
#             "instructions": f"Previous conversation summary: {summary}",
#         }
#     })
#     return new_ws

Session rotation is more complex than a token trim on a text API: you're closing a WebSocket connection, opening a new one, re-authenticating, and restoring any tool configurations. Building the rotation logic once and testing it deliberately — rather than discovering you need it when sessions are 45 minutes old in production — is the practical takeaway here. The cost savings from preventing context accumulation compound over the lifetime of each session.

Combining all four guards: RealtimeGuard

Each guard targets a distinct failure mode, but all four need to be active simultaneously in a production voice agent. RealtimeGuard is a composing class that wires all four checks into a single event handler wrapper, dispatching to the right guard based on the Realtime API event type:

import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Callable, Awaitable


@dataclass
class RealtimeGuard:
    """
    Composite circuit breaker for OpenAI Realtime API voice agents.

    Wraps the WebSocket event loop and routes events to four specialized guards:
    1. BargeinRateGuard      — detects barge-in amplification loops
    2. VADStormDetector      — detects server VAD false-positive storms
    3. FunctionEchoDetector  — detects function call echo chamber loops
    4. SessionAgeGuard       — detects session transcript accumulation

    Usage: call handle_event() for every event received from the WebSocket.
    Raises typed exceptions when a failure mode is detected.
    The caller is responsible for exception handling and recovery actions.
    """
    # BargeinRateGuard config
    max_cancels_per_minute: int = 5
    cancel_window_seconds: float = 60.0
    debounce_seconds: float = 1.5

    # VADStormDetector config
    min_turn_interval_ms: float = 800.0
    consecutive_short_turns_limit: int = 4

    # FunctionEchoDetector config
    max_consecutive_identical_calls: int = 3

    # SessionAgeGuard config
    max_session_minutes: float = 20.0
    max_turn_count: int = 50

    def __post_init__(self) -> None:
        self._barge_in = BargeinRateGuard(
            max_cancels_per_window=self.max_cancels_per_minute,
            window_seconds=self.cancel_window_seconds,
            min_debounce_seconds=self.debounce_seconds,
        )
        self._vad = VADStormDetector(
            min_turn_interval_ms=self.min_turn_interval_ms,
            consecutive_short_turns_limit=self.consecutive_short_turns_limit,
        )
        self._echo = FunctionEchoDetector(
            max_consecutive_identical=self.max_consecutive_identical_calls,
        )
        self._age = SessionAgeGuard(
            max_session_minutes=self.max_session_minutes,
            max_turn_count=self.max_turn_count,
        )

    def handle_event(self, event: dict) -> bool:
        """
        Process a single Realtime API server event through all applicable guards.

        Returns True if the application should proceed normally (e.g. call response.create).
        Returns False if the event was suppressed by the VAD storm guard.

        Raises:
            BargeinLoopError    — barge-in cancel rate too high
            VADStormError       — VAD inter-turn interval too short, too many times
            FunctionEchoError   — identical function call repeated too many times
            SessionAgeError     — session age or turn count exceeds rotation threshold
            SessionAgeWarning   — session approaching rotation threshold (non-fatal)
        """
        event_type = event.get("type", "")

        if event_type == "session.created":
            self._age.record_session_created(
                event.get("session", {}).get("id", "unknown")
            )

        elif event_type == "response.cancelled":
            # response.cancelled is the server-sent event when a response is cancelled
            self._barge_in.record_cancel()

        elif event_type == "input_audio_buffer.speech_started":
            # User interrupted — check debounce before any subsequent response.create
            # The check happens at the point of re-issuing response.create, not here.
            pass

        elif event_type == "conversation.item.created":
            item = event.get("item", {})
            if item.get("role") == "user":
                should_respond = self._vad.record_user_turn()
                if not should_respond:
                    return False  # VAD suppression active

        elif event_type == "response.function_call_arguments.done":
            self._echo.record_function_call(
                function_name=event.get("name", ""),
                arguments_json=event.get("arguments", "{}"),
            )

        elif event_type == "response.done":
            self._age.record_turn_completed()

        return True

    def check_before_response_create(self) -> None:
        """
        Call immediately before sending response.create to the server.
        Checks the barge-in debounce and session age thresholds.
        """
        self._barge_in.check_debounce()
        self._age.check_now()

    @property
    def session_stats(self) -> dict:
        return {
            "session_age_minutes": round(self._age.session_age_minutes, 2),
            "turn_count": self._age.turn_count,
            "cancel_count_in_window": self._barge_in.cancel_count_in_window,
            "avg_vad_interval_ms": round(self._vad.avg_recent_interval_ms, 1),
            "function_call_count": self._echo.call_count,
        }


# Complete usage example with asyncio and websockets:
#
# import websockets
#
# async def run_voice_agent(api_key: str) -> None:
#     guard = RealtimeGuard(
#         max_cancels_per_minute=5,
#         min_turn_interval_ms=800.0,
#         max_session_minutes=20.0,
#     )
#
#     uri = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"
#     headers = {"Authorization": f"Bearer {api_key}", "OpenAI-Beta": "realtime=v1"}
#
#     async with websockets.connect(uri, additional_headers=headers) as ws:
#         async for raw_msg in ws:
#             event = json.loads(raw_msg)
#             try:
#                 should_continue = guard.handle_event(event)
#             except (BargeinLoopError, VADStormError, FunctionEchoError) as exc:
#                 # Session-level failures: log and apply recovery
#                 print(f"Guard tripped: {exc}")
#                 if isinstance(exc, VADStormError):
#                     guard._vad.activate_suppression(duration_seconds=3.0)
#                 continue
#             except SessionAgeError as exc:
#                 # Session needs rotation
#                 print(f"Session rotation required: {exc}")
#                 await ws.close()
#                 return  # reconnect with new session in caller
#             except SessionAgeWarning as exc:
#                 # Non-fatal: log and continue
#                 print(f"Session rotation advisory: {exc}")
#                 should_continue = True
#
#             if not should_continue:
#                 continue  # VAD suppression active — skip response.create
#
#             # Route event to application logic
#             await route_event(ws, event, guard)

The guard raises typed exceptions so the caller can apply the right recovery action for each failure mode: VAD storms get a suppression window, barge-in loops benefit from a configuration change to VAD sensitivity, function echo chambers need the audio output channel audited for re-entry, and session age errors require a reconnect with fresh session state. Mixing all of these into a single exception would make it impossible to respond correctly. The cost estimator widget on the RunGuard homepage can help you project the dollar impact of each failure mode at your actual traffic volume before you deploy guards in production.

Guard threshold reference for Realtime API

Guard parameter Recommended default Rationale
Barge-in cancel threshold 5 cancels / 60 s Legitimate conversations average 1–2 barge-ins per minute at most. Five cancels per minute is the threshold where the pattern is clearly amplification, not normal conversation.
Barge-in debounce 1.5 seconds The server VAD's default silence_duration_ms is 500ms. Adding 1 second of application-level debounce on top prevents response.create from firing before the user has truly stopped speaking.
VAD inter-turn minimum 800 ms Human speech turns separated by less than 800ms are physiologically implausible (human reaction time floor is ~200ms; typical inter-turn gaps are 500ms–2s). Below 800ms is a near-certain VAD false positive.
VAD consecutive short turns 4 consecutive One or two short turns can occur legitimately (short acknowledgements, "yes", "uh-huh"). Four consecutive short turns indicates a sustained noise source, not normal conversation.
Function echo consecutive limit 3 identical in window of 20 Legitimate agents may call the same function twice with identical arguments (idempotency check, confirmation pattern). Three identical consecutive calls within any 20-call window is the loop signal.
Session rotation age 20 minutes At moderate conversation density (~2 turns/min), 20 minutes yields ~40 turns of accumulated history. Beyond this, the per-response input cost is material. Rotate at 20 min for cost control, or at natural conversation pauses.
Session rotation turn count 50 turns 50 turns provides a hard count-based limit for high-cadence conversations that hit the turn threshold before the time threshold.

FAQ

Does RunGuard work with the OpenAI Realtime API Python SDK?

Yes. The guards in this post are plain Python dataclasses that operate on event dictionaries — they're not tied to any specific WebSocket client library. Whether you're using the openai Python SDK's AsyncRealtimeConnection, the websockets library directly, or any other WebSocket client, you pass the parsed event dictionary to RealtimeGuard.handle_event() and it does the rest. RunGuard's SDK wraps these guards with additional telemetry and a managed dashboard, but the pattern works standalone as shown above. Check the RunGuard homepage for the current SDK integration guide for the Realtime API specifically.

Is there a way to detect loops without interrupting the user mid-conversation?

Yes — and this is an important design constraint for voice agents specifically. The BargeinRateGuard and VADStormDetector raise exceptions at the point where your application code decides whether to call response.create, not mid-stream while the user is speaking. The FunctionEchoDetector raises after receiving the response.function_call_arguments.done event, before the tool executes. The SessionAgeGuard raises at response.done or on a pre-response check. None of these interrupt audio streaming mid-utterance. The only externally-visible effect of a tripped guard is that the next agent response is delayed or suppressed — which from the user's perspective is silence rather than an interruption. The recommended UX pattern is to play a brief audio buffer ("One moment...") when a recovery action is needed, rather than letting the silence be unexplained.

How does barge-in billing work — am I really paying for cancelled partial audio?

Yes. The OpenAI Realtime API billing model charges for audio tokens as they are generated, not as they are delivered to the client. When a user barge-in triggers response.cancel, the server stops generating new audio — but any audio tokens already produced in the current response are included in the session's usage. The response.done event (sent even for cancelled responses, with status: "cancelled") includes a usage object with the token counts for the partial response. Verify current per-token rates at openai.com/pricing; the Realtime API pricing page lists both the audio input and audio output rates separately. The practical implication is that a barge-in loop at 10 cycles/minute is not free — it generates real usage on every cycle, even though the user hears nothing useful.

Can I use RunGuard's LoopDetector for Realtime API function call loops?

RunGuard's core LoopDetector class (documented in the cost control pattern reference) handles the fingerprint-and-window pattern that underlies the FunctionEchoDetector above. The Realtime-specific adaptation in this post handles the additional complexity of parsing the raw JSON arguments string from response.function_call_arguments.done events and the wider window needed because audio turns interleave between tool calls. If you're already using RunGuard's SDK, the RealtimeLoopDetector integration extends the core LoopDetector with these Realtime-specific parsing and windowing behaviors — check the changelog for the specific SDK version that added Realtime API support.

What's a safe session duration for production voice agents?

The right session duration depends on your use case and conversation density. For high-cadence customer service agents (2–4 turns/minute), the 50-turn limit is typically hit before the 20-minute limit — plan for rotation around the 15–25 minute mark. For low-cadence assistants (one turn every few minutes), the time limit is more relevant. The key insight is that "safe" isn't just about context window limits (the Realtime API has a large context window) — it's about the compounding input token cost as history grows. A session at 40 turns deep costs roughly twice as much per response in input tokens as a fresh session, even if the model could technically handle much more context. For kiosk or phone agent deployments where sessions might run for hours, implement rotation at every natural break point (end of a task, brief silence of >30 seconds) rather than only at hard time limits.

Guard your voice agent before the audio bill arrives

RunGuard's SDK brings barge-in rate monitoring, VAD storm detection, function echo guards, and session rotation triggers to production Realtime API deployments — with a unified dashboard across all your voice agent sessions. One integration, all four guards, real-time spend visibility.

Start free trial — no card required

Also in this series