OpenAI Realtime API Cost Control: Loop Detection and Budget Enforcement for Voice Agents
The OpenAI Realtime API is architecturally different from every other API covered in this series. You're not sending HTTP requests and receiving JSON responses. You're opening a persistent WebSocket connection, streaming audio in and audio out, and responding to a stream of server-sent events. The billing model is different too: you pay for audio tokens — roughly $0.06/min for input and $0.24/min for output with gpt-4o-realtime-preview (verify current rates at openai.com/pricing) — not text tokens per completion.
That last word — "output" — conceals the most important cost fact about the Realtime API: partial audio is billed. If your agent begins generating a 10-second response and the user interrupts after 3 seconds, the server sends a response.cancel event and stops generating. But those 3 seconds of audio output that were already generated are billed. The audio was produced, streamed, and discarded — and you pay for it regardless.
For text-based agents, every token in a completion is either used or the whole request is abandoned at the network layer before cost accrues. For voice agents, cost accrues in real time, incrementally, from the moment the model starts speaking. This creates four failure modes that text agents don't have — and that the standard loop-detection patterns from OpenAI Agents SDK cost control won't catch.
This post covers all four, with Python asyncio implementations that plug into the Realtime API WebSocket event loop.
Scope. This post targets agents built directly on the OpenAI Realtime API WebSocket protocol — the wss://api.openai.com/v1/realtime endpoint using the openai Python SDK's AsyncRealtimeConnection or a raw websockets client. The failure modes apply equally to phone agents (Twilio + Realtime API) and browser-based voice assistants. For text-only agents using the standard OpenAI completions or Assistants API, see OpenAI Agents SDK Cost Control. For async Python patterns that apply across both, see Async Python AI Agent Cost Control.
How the Realtime API differs from text APIs for cost purposes
Before the failure modes, it helps to be precise about how the billing model and event model differ from text-based APIs. The gap is wider than most developers expect when first integrating the Realtime API.
| Dimension | Cloud text API (GPT-4o, Claude, Gemini) | OpenAI Realtime API (gpt-4o-realtime-preview) |
|---|---|---|
| Cost unit | Text tokens (input + output per completion) | Audio tokens (input + output, billed per second of audio) |
| Partial billing | No — a cancelled or failed request is not billed for output | Yes — audio already generated before a response.cancel is billed in full |
| Context signal | Explicit: HTTP 400 context_length_exceeded or token count in response |
Session-managed server-side; no per-call context signal; grows silently |
| Loop detection mechanism | Fingerprint tool call arguments across consecutive turns | Fingerprint tool calls AND monitor event rates (cancel/VAD events per minute) |
| Interruption model | None — responses complete atomically | User barge-in fires input_audio_buffer.speech_started; server truncates response |
| Session state | Stateless per request; you control the messages list | Stateful WebSocket session; server accumulates conversation history automatically |
The partial billing and server-managed session state together mean that the standard "count tokens, enforce a budget, check for repeated tool calls" pattern from the cost control pattern reference is necessary but not sufficient. You need additional guards at the event level, not just the turn level.
Failure Mode 1: Barge-in amplification loop
When a user interrupts mid-response — because the agent is too slow, repeating itself, or the user simply has something new to say — the browser or phone client sends audio to the server while the response is playing. The server detects speech energy in the input stream, fires input_audio_buffer.speech_started, and cancels the current response with response.cancel. The partial audio already generated is billed.
So far, this is expected behavior. The amplification loop starts when the application code — handling the response.cancel event — immediately calls response.create to start a new response based on the user's new input. If the user is in a noisy environment (background TV, HVAC, open office), or if the VAD sensitivity is high, the pattern becomes:
- Agent starts speaking (response generates audio, cost accrues)
- Background noise triggers
input_audio_buffer.speech_started response.cancelfires — 2 seconds of audio billed, discarded- Application immediately calls
response.create— new response starts - Same background noise triggers
input_audio_buffer.speech_startedagain - Loop repeats at 10–20 cycles per minute
At $0.24/min for output audio, with 2-second partial responses cancelled at 10 cycles per minute, you're paying for ~20 seconds of audio output per minute — for zero delivered value. At scale across multiple concurrent sessions, this is invisible on a per-session basis and devastating on the aggregate bill.
Detection requires tracking response.cancel events in a rolling time window. A BargeinRateGuard raises BargeinLoopError when the cancel rate exceeds the threshold:
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
class BargeinLoopError(RuntimeError):
pass
@dataclass
class BargeinRateGuard:
"""
Detects barge-in amplification loops by tracking response.cancel
event rate in a rolling time window.
A high cancel rate indicates the application is immediately re-starting
responses after cancellation without a debounce delay — causing rapid
billing cycles with zero delivered audio value.
"""
max_cancels_per_window: int = 5
window_seconds: float = 60.0
min_debounce_seconds: float = 1.5 # minimum wait before re-issuing response.create
_cancel_times: deque = field(default_factory=deque, init=False)
_last_cancel_time: float = field(default=0.0, init=False)
def record_cancel(self) -> None:
"""Call whenever a response.cancel event is received from the server."""
now = time.monotonic()
self._last_cancel_time = now
# Prune events outside the rolling window
cutoff = now - self.window_seconds
while self._cancel_times and self._cancel_times[0] < cutoff:
self._cancel_times.popleft()
self._cancel_times.append(now)
if len(self._cancel_times) >= self.max_cancels_per_window:
rate = len(self._cancel_times) / self.window_seconds * 60
raise BargeinLoopError(
f"Barge-in amplification loop detected: {len(self._cancel_times)} response "
f"cancellations in the last {self.window_seconds:.0f}s "
f"({rate:.1f} cancels/min). "
"Apply a cooldown before calling response.create after a barge-in, "
"or reduce server_vad sensitivity in the session configuration."
)
def check_debounce(self) -> None:
"""
Call before issuing response.create after a barge-in cancellation.
Raises BargeinLoopError if insufficient time has passed since the last cancel.
"""
if self._last_cancel_time == 0.0:
return
elapsed = time.monotonic() - self._last_cancel_time
if elapsed < self.min_debounce_seconds:
raise BargeinLoopError(
f"response.create called {elapsed:.2f}s after last barge-in cancel "
f"(minimum debounce: {self.min_debounce_seconds}s). "
"Wait for the user to finish speaking before restarting the response."
)
@property
def cancel_count_in_window(self) -> int:
now = time.monotonic()
cutoff = now - self.window_seconds
return sum(1 for t in self._cancel_times if t >= cutoff)
# Integration with an asyncio WebSocket event handler:
#
# guard = BargeinRateGuard(max_cancels_per_window=5, window_seconds=60.0)
#
# async def handle_event(event: dict) -> None:
# event_type = event.get("type")
# if event_type == "response.cancel":
# guard.record_cancel() # raises BargeinLoopError if threshold exceeded
# elif event_type == "response.done":
# # Debounce check: only restart if silence detected, not immediately
# if should_continue_conversation(event):
# guard.check_debounce()
# await ws.send_json({"type": "response.create"})
The check_debounce method enforces a mandatory pause between a cancel event and the next response.create. A 1.5-second debounce is long enough to let genuine user speech finish before interpreting silence as "ready for response" — which is what the server VAD should already do, but the debounce protects against edge cases where the application code bypasses VAD state.
Failure Mode 2: Server VAD false-positive storm
When you configure turn_detection.type = "server_vad" in the session, the server automatically segments the user's speech into turns: it detects when audio energy exceeds a threshold (speech_started), then detects silence after speech ends (speech_stopped), and automatically triggers a response generation. This removes the need for push-to-talk UI, which is exactly why most production voice agents use it.
In noisy environments — HVAC systems, keyboard clicks, other speakers in an open office, a phone call on speakerphone — the VAD can fire on background audio. The server-side VAD is energy-based and operates on very short windows. A repeated noise at roughly speech frequencies (90–300 Hz, which covers many mechanical sounds) can trigger input_audio_buffer.speech_started followed immediately by input_audio_buffer.speech_stopped, causing the server to issue a response to audio that contains no speech.
The signature of a VAD storm is distinct from a barge-in loop: instead of a response generating and then being cancelled, you get very short "turns" — conversation.item.created events with role: "user" arriving within milliseconds to hundreds of milliseconds of each other, far shorter than any real human speech turn. Humans speak in turns of 1–8 seconds; VAD false positives arrive in bursts of 50–400ms.
A VADStormDetector measures the inter-turn interval and suppresses response generation when the interval is too short to be genuine speech:
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
class VADStormError(RuntimeError):
pass
@dataclass
class VADStormDetector:
"""
Detects server VAD false-positive storms by measuring inter-turn intervals.
Real human speech turns are separated by at least 800ms of silence (the
server VAD's speech_stopped detection window). Consecutive user turns
arriving faster than this threshold indicate VAD misfires on background audio.
"""
min_turn_interval_ms: float = 800.0 # below this = likely VAD false positive
consecutive_short_turns_limit: int = 4 # trip after this many consecutive short turns
measurement_window: int = 10 # track this many recent inter-turn intervals
_last_turn_time: float = field(default=0.0, init=False)
_recent_intervals_ms: deque = field(default_factory=deque, init=False)
_consecutive_short: int = field(default=0, init=False)
_suppression_active: bool = field(default=False, init=False)
_suppression_until: float = field(default=0.0, init=False)
def record_user_turn(self) -> bool:
"""
Call when a conversation.item.created event with role='user' is received.
Returns True if response generation should proceed, False if suppressed.
Raises VADStormError if the storm threshold is exceeded.
"""
now = time.monotonic()
if self._last_turn_time > 0.0:
interval_ms = (now - self._last_turn_time) * 1000.0
# Maintain a rolling window of recent intervals
self._recent_intervals_ms.append(interval_ms)
if len(self._recent_intervals_ms) > self.measurement_window:
self._recent_intervals_ms.popleft()
if interval_ms < self.min_turn_interval_ms:
self._consecutive_short += 1
else:
self._consecutive_short = 0 # reset on a legitimate-length turn
if self._consecutive_short >= self.consecutive_short_turns_limit:
short_intervals = [
f"{iv:.0f}ms" for iv in self._recent_intervals_ms
if iv < self.min_turn_interval_ms
]
raise VADStormError(
f"Server VAD false-positive storm detected: "
f"{self._consecutive_short} consecutive user turns with inter-turn "
f"interval below {self.min_turn_interval_ms:.0f}ms. "
f"Recent short intervals: {', '.join(short_intervals[-5:])}. "
"Reduce vad.threshold in session configuration, or increase "
"silence_duration_ms to require longer silence before turn completion."
)
self._last_turn_time = now
# If active suppression window hasn't expired, suppress response generation
if self._suppression_active and time.monotonic() < self._suppression_until:
return False # tell the caller: do not call response.create
self._suppression_active = False
return True
def activate_suppression(self, duration_seconds: float = 3.0) -> None:
"""
Temporarily suppress response generation after a short-turn burst.
Call from the exception handler of VADStormError before resetting the counter.
"""
self._suppression_active = True
self._suppression_until = time.monotonic() + duration_seconds
self._consecutive_short = 0 # reset for the next measurement cycle
@property
def avg_recent_interval_ms(self) -> float:
if not self._recent_intervals_ms:
return float("inf")
return sum(self._recent_intervals_ms) / len(self._recent_intervals_ms)
# Integration example:
#
# vad_guard = VADStormDetector(
# min_turn_interval_ms=800.0,
# consecutive_short_turns_limit=4,
# )
#
# async def handle_event(event: dict) -> None:
# if event.get("type") == "conversation.item.created":
# item = event.get("item", {})
# if item.get("role") == "user":
# try:
# should_respond = vad_guard.record_user_turn()
# except VADStormError as exc:
# vad_guard.activate_suppression(duration_seconds=3.0)
# log.warning("VAD storm suppressed: %s", exc)
# return # skip response.create during suppression
# if should_respond:
# await ws.send_json({"type": "response.create"})
The suppression mechanism is the key addition over a simple threshold check. After a VAD storm is detected, calling activate_suppression() tells the detector to silently discard the next few turns — giving the actual noise source time to settle — before re-enabling response generation. Without this, the exception handler itself becomes a source of rapid on/off cycling.
Failure Mode 3: Function call echo chamber
The Realtime API supports server-side function calls that work differently from text-API tool use. The model emits a response.function_call_arguments.done event when it wants to call a tool. Your application executes the tool, sends the result back as a conversation.item.create event with type: "function_call_output", and then calls response.create to let the model continue. The model then generates an audio response acknowledging the tool result before deciding whether to call another tool.
That audio acknowledgement — "I've completed the file save" or "The search returned three results, let me summarize them" — is where the echo chamber begins in one specific class of agent implementations. In VAD-enabled sessions without careful stream management, the model's own audio output can be picked up by the microphone (in environments without proper acoustic echo cancellation), transcribed as user input, and re-submitted to the session as a new user turn. The turn contains the model's own words, which the model interprets as instructions to call the tool again.
Even without acoustic echo, the pattern appears in buggy agents that re-process the transcript of function call results as if they were new user messages. The loop: model calls tool → tool result submitted → model speaks confirmation → confirmation transcribed as user input → model re-issues the same tool call → loop repeats.
This is structurally distinct from the text-API tool call loop. In text agents, you detect repeated (function_name, arguments) pairs across consecutive assistant turns. In voice agents, the cycle includes an audio confirmation turn between each tool call — so the tool calls don't appear consecutive in the raw event stream. The fingerprint tracker needs to look across a wider window, ignoring interleaved audio turns:
import hashlib
import json
from collections import deque
from dataclasses import dataclass, field
class FunctionEchoError(RuntimeError):
pass
@dataclass
class FunctionEchoDetector:
"""
Detects function call echo chamber loops in Realtime API sessions.
Tracks (function_name, argument_fingerprint) tuples across all tool calls
in the session, ignoring interleaved audio turns. Raises FunctionEchoError
when the same function+arguments pair appears in N consecutive tool calls
within a sliding window.
"""
max_consecutive_identical: int = 3
window_size: int = 20 # track this many recent tool calls
_call_history: deque = field(default_factory=deque, init=False)
def record_function_call(self, function_name: str, arguments_json: str) -> None:
"""
Call when a response.function_call_arguments.done event is received.
arguments_json: the raw JSON string from the event's 'arguments' field.
Raises FunctionEchoError if a repeated call pattern is detected.
"""
try:
args_dict = json.loads(arguments_json) if arguments_json.strip() else {}
except json.JSONDecodeError:
args_dict = {"_raw": arguments_json}
fingerprint = self._fingerprint(function_name, args_dict)
self._call_history.append((function_name, fingerprint))
if len(self._call_history) > self.window_size:
self._call_history.popleft()
# Count how many of the most recent N calls share this fingerprint
recent = list(self._call_history)
# Scan backwards for consecutive identical fingerprints at the tail
consecutive = 0
for _, fp in reversed(recent):
if fp == fingerprint:
consecutive += 1
else:
break # stop at the first non-matching call
if consecutive >= self.max_consecutive_identical:
raise FunctionEchoError(
f"Function call echo chamber detected: '{function_name}' called "
f"with identical arguments {consecutive}x in {len(recent)} recent calls. "
f"Fingerprint: {fingerprint}. "
"Common causes: (1) model audio output re-entering session as user input "
"(check acoustic echo cancellation or mute model output channel), "
"(2) function result not being correctly appended to conversation history, "
"(3) agent re-processing transcript of previous function_call_output items."
)
def _fingerprint(self, function_name: str, arguments: dict) -> str:
canonical = f"{function_name}:{json.dumps(arguments, sort_keys=True)}"
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
@property
def call_count(self) -> int:
return len(self._call_history)
def get_recent_calls(self, n: int = 5) -> list[tuple[str, str]]:
"""Return the n most recent (function_name, fingerprint) pairs."""
return list(self._call_history)[-n:]
# Integration example:
#
# echo_guard = FunctionEchoDetector(max_consecutive_identical=3)
#
# async def handle_event(event: dict) -> None:
# if event.get("type") == "response.function_call_arguments.done":
# function_name = event.get("name", "")
# arguments = event.get("arguments", "{}")
# echo_guard.record_function_call(function_name, arguments)
# # Proceeds to tool execution only if no exception raised
# result = await execute_tool(function_name, arguments)
# await ws.send_json({
# "type": "conversation.item.create",
# "item": {
# "type": "function_call_output",
# "call_id": event.get("call_id"),
# "output": json.dumps(result),
# }
# })
# await ws.send_json({"type": "response.create"})
The "scan backwards for consecutive identical fingerprints" approach correctly handles the interleaved audio confirmation pattern: even if there are two or three non-tool-call events between identical tool calls, the consecutive count resets to 1 when a different tool call appears, so the guard won't falsely trip on a legitimate sequence of calls to the same function with different arguments.
Failure Mode 4: Session transcript accumulation
Unlike the text APIs — where you send the full conversation history on every request and explicitly control what's included — the Realtime API maintains conversation history server-side in the WebSocket session. Every turn (user audio, function call, function result, assistant audio + transcript) is automatically appended to the session's conversation. You don't send it; the session accumulates it.
This is convenient for short conversations but creates a compounding cost problem for long-running voice agents: kiosks, phone support lines, interactive voice response systems, or any session that stays open for many minutes. After 30 minutes of moderate conversation, the session history is substantial. Every new response generation pays for that entire accumulated context as input — and since input audio/text tokens are billed at $0.06/min equivalent, a dense 30-minute history might cost $1.80 in input tokens for a single new response.
The correct mitigation is periodic session rotation: close the current WebSocket session, open a new one, and carry forward only the minimal context needed for continuity (current task summary, user preferences). But most agent implementations don't do this — the WebSocket stays open indefinitely, accumulating context without any awareness of the growing cost baseline.
A SessionAgeGuard monitors session duration and turn count, alerting before the accumulated context reaches the point where session rotation becomes urgent:
import asyncio
import time
from dataclasses import dataclass, field
class SessionAgeError(RuntimeError):
pass
@dataclass
class SessionAgeGuard:
"""
Monitors Realtime API session age and conversation turn count.
Long-running sessions accumulate conversation history server-side,
increasing the input token cost of every new response. This guard
raises SessionAgeError when the session should be rotated to prevent
unbounded context accumulation cost.
"""
max_session_minutes: float = 20.0 # rotate after 20 minutes
max_turn_count: int = 50 # rotate after 50 conversation turns
warn_at_fraction: float = 0.80 # warn at 80% of each limit
_session_start_time: float = field(default_factory=time.monotonic, init=False)
_session_id: str = field(default="", init=False)
_turn_count: int = field(default=0, init=False)
_warned_age: bool = field(default=False, init=False)
_warned_turns: bool = field(default=False, init=False)
def record_session_created(self, session_id: str) -> None:
"""Call when a session.created event is received from the server."""
self._session_id = session_id
self._session_start_time = time.monotonic()
self._turn_count = 0
self._warned_age = False
self._warned_turns = False
def record_turn_completed(self) -> None:
"""
Call when a response.done event is received, indicating a full
assistant turn has completed (audio generation finished).
Raises SessionAgeError if rotation thresholds are exceeded.
"""
self._turn_count += 1
self._check_thresholds()
def check_now(self) -> None:
"""
Explicit check — call before any response.create to detect
if the session has aged past the rotation threshold.
"""
self._check_thresholds()
def _check_thresholds(self) -> None:
elapsed_minutes = (time.monotonic() - self._session_start_time) / 60.0
# Hard limits
if elapsed_minutes >= self.max_session_minutes:
raise SessionAgeError(
f"Session '{self._session_id}' has been open for "
f"{elapsed_minutes:.1f} minutes (limit: {self.max_session_minutes:.0f} min). "
f"Accumulated {self._turn_count} conversation turns. "
"Rotate the session: close this WebSocket, open a new session.create, "
"and resume with a summarized context to prevent compounding input token cost."
)
if self._turn_count >= self.max_turn_count:
raise SessionAgeError(
f"Session '{self._session_id}' has accumulated {self._turn_count} turns "
f"(limit: {self.max_turn_count}). "
f"Session age: {elapsed_minutes:.1f} minutes. "
"Rotate the session to reset the server-side conversation history "
"and control the input token baseline for future responses."
)
# Soft warnings (logged, not raised)
age_fraction = elapsed_minutes / self.max_session_minutes
turn_fraction = self._turn_count / self.max_turn_count
if age_fraction >= self.warn_at_fraction and not self._warned_age:
self._warned_age = True
# Return a warning signal rather than raising — callers can log/alert
raise SessionAgeWarning(
f"Session '{self._session_id}' at {age_fraction*100:.0f}% of age limit "
f"({elapsed_minutes:.1f}/{self.max_session_minutes:.0f} min). "
"Consider preparing a session rotation at a natural conversation break."
)
if turn_fraction >= self.warn_at_fraction and not self._warned_turns:
self._warned_turns = True
raise SessionAgeWarning(
f"Session '{self._session_id}' at {turn_fraction*100:.0f}% of turn limit "
f"({self._turn_count}/{self.max_turn_count} turns). "
"Plan session rotation before the next natural pause."
)
@property
def session_age_minutes(self) -> float:
return (time.monotonic() - self._session_start_time) / 60.0
@property
def turn_count(self) -> int:
return self._turn_count
class SessionAgeWarning(RuntimeError):
"""Non-fatal warning: session is approaching rotation thresholds."""
pass
# Integration example showing session rotation:
#
# age_guard = SessionAgeGuard(max_session_minutes=20.0, max_turn_count=50)
#
# async def rotate_session(ws, summary: str) -> None:
# """Close old session and open a new one with summarized context."""
# await ws.close()
# new_ws = await open_realtime_connection()
# await new_ws.send_json({
# "type": "session.update",
# "session": {
# "instructions": f"Previous conversation summary: {summary}",
# }
# })
# return new_ws
Session rotation is more complex than a token trim on a text API: you're closing a WebSocket connection, opening a new one, re-authenticating, and restoring any tool configurations. Building the rotation logic once and testing it deliberately — rather than discovering you need it when sessions are 45 minutes old in production — is the practical takeaway here. The cost savings from preventing context accumulation compound over the lifetime of each session.
Combining all four guards: RealtimeGuard
Each guard targets a distinct failure mode, but all four need to be active simultaneously in a production voice agent. RealtimeGuard is a composing class that wires all four checks into a single event handler wrapper, dispatching to the right guard based on the Realtime API event type:
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Callable, Awaitable
@dataclass
class RealtimeGuard:
"""
Composite circuit breaker for OpenAI Realtime API voice agents.
Wraps the WebSocket event loop and routes events to four specialized guards:
1. BargeinRateGuard — detects barge-in amplification loops
2. VADStormDetector — detects server VAD false-positive storms
3. FunctionEchoDetector — detects function call echo chamber loops
4. SessionAgeGuard — detects session transcript accumulation
Usage: call handle_event() for every event received from the WebSocket.
Raises typed exceptions when a failure mode is detected.
The caller is responsible for exception handling and recovery actions.
"""
# BargeinRateGuard config
max_cancels_per_minute: int = 5
cancel_window_seconds: float = 60.0
debounce_seconds: float = 1.5
# VADStormDetector config
min_turn_interval_ms: float = 800.0
consecutive_short_turns_limit: int = 4
# FunctionEchoDetector config
max_consecutive_identical_calls: int = 3
# SessionAgeGuard config
max_session_minutes: float = 20.0
max_turn_count: int = 50
def __post_init__(self) -> None:
self._barge_in = BargeinRateGuard(
max_cancels_per_window=self.max_cancels_per_minute,
window_seconds=self.cancel_window_seconds,
min_debounce_seconds=self.debounce_seconds,
)
self._vad = VADStormDetector(
min_turn_interval_ms=self.min_turn_interval_ms,
consecutive_short_turns_limit=self.consecutive_short_turns_limit,
)
self._echo = FunctionEchoDetector(
max_consecutive_identical=self.max_consecutive_identical_calls,
)
self._age = SessionAgeGuard(
max_session_minutes=self.max_session_minutes,
max_turn_count=self.max_turn_count,
)
def handle_event(self, event: dict) -> bool:
"""
Process a single Realtime API server event through all applicable guards.
Returns True if the application should proceed normally (e.g. call response.create).
Returns False if the event was suppressed by the VAD storm guard.
Raises:
BargeinLoopError — barge-in cancel rate too high
VADStormError — VAD inter-turn interval too short, too many times
FunctionEchoError — identical function call repeated too many times
SessionAgeError — session age or turn count exceeds rotation threshold
SessionAgeWarning — session approaching rotation threshold (non-fatal)
"""
event_type = event.get("type", "")
if event_type == "session.created":
self._age.record_session_created(
event.get("session", {}).get("id", "unknown")
)
elif event_type == "response.cancelled":
# response.cancelled is the server-sent event when a response is cancelled
self._barge_in.record_cancel()
elif event_type == "input_audio_buffer.speech_started":
# User interrupted — check debounce before any subsequent response.create
# The check happens at the point of re-issuing response.create, not here.
pass
elif event_type == "conversation.item.created":
item = event.get("item", {})
if item.get("role") == "user":
should_respond = self._vad.record_user_turn()
if not should_respond:
return False # VAD suppression active
elif event_type == "response.function_call_arguments.done":
self._echo.record_function_call(
function_name=event.get("name", ""),
arguments_json=event.get("arguments", "{}"),
)
elif event_type == "response.done":
self._age.record_turn_completed()
return True
def check_before_response_create(self) -> None:
"""
Call immediately before sending response.create to the server.
Checks the barge-in debounce and session age thresholds.
"""
self._barge_in.check_debounce()
self._age.check_now()
@property
def session_stats(self) -> dict:
return {
"session_age_minutes": round(self._age.session_age_minutes, 2),
"turn_count": self._age.turn_count,
"cancel_count_in_window": self._barge_in.cancel_count_in_window,
"avg_vad_interval_ms": round(self._vad.avg_recent_interval_ms, 1),
"function_call_count": self._echo.call_count,
}
# Complete usage example with asyncio and websockets:
#
# import websockets
#
# async def run_voice_agent(api_key: str) -> None:
# guard = RealtimeGuard(
# max_cancels_per_minute=5,
# min_turn_interval_ms=800.0,
# max_session_minutes=20.0,
# )
#
# uri = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"
# headers = {"Authorization": f"Bearer {api_key}", "OpenAI-Beta": "realtime=v1"}
#
# async with websockets.connect(uri, additional_headers=headers) as ws:
# async for raw_msg in ws:
# event = json.loads(raw_msg)
# try:
# should_continue = guard.handle_event(event)
# except (BargeinLoopError, VADStormError, FunctionEchoError) as exc:
# # Session-level failures: log and apply recovery
# print(f"Guard tripped: {exc}")
# if isinstance(exc, VADStormError):
# guard._vad.activate_suppression(duration_seconds=3.0)
# continue
# except SessionAgeError as exc:
# # Session needs rotation
# print(f"Session rotation required: {exc}")
# await ws.close()
# return # reconnect with new session in caller
# except SessionAgeWarning as exc:
# # Non-fatal: log and continue
# print(f"Session rotation advisory: {exc}")
# should_continue = True
#
# if not should_continue:
# continue # VAD suppression active — skip response.create
#
# # Route event to application logic
# await route_event(ws, event, guard)
The guard raises typed exceptions so the caller can apply the right recovery action for each failure mode: VAD storms get a suppression window, barge-in loops benefit from a configuration change to VAD sensitivity, function echo chambers need the audio output channel audited for re-entry, and session age errors require a reconnect with fresh session state. Mixing all of these into a single exception would make it impossible to respond correctly. The cost estimator widget on the RunGuard homepage can help you project the dollar impact of each failure mode at your actual traffic volume before you deploy guards in production.
Guard threshold reference for Realtime API
| Guard parameter | Recommended default | Rationale |
|---|---|---|
| Barge-in cancel threshold | 5 cancels / 60 s | Legitimate conversations average 1–2 barge-ins per minute at most. Five cancels per minute is the threshold where the pattern is clearly amplification, not normal conversation. |
| Barge-in debounce | 1.5 seconds | The server VAD's default silence_duration_ms is 500ms. Adding 1 second of application-level debounce on top prevents response.create from firing before the user has truly stopped speaking. |
| VAD inter-turn minimum | 800 ms | Human speech turns separated by less than 800ms are physiologically implausible (human reaction time floor is ~200ms; typical inter-turn gaps are 500ms–2s). Below 800ms is a near-certain VAD false positive. |
| VAD consecutive short turns | 4 consecutive | One or two short turns can occur legitimately (short acknowledgements, "yes", "uh-huh"). Four consecutive short turns indicates a sustained noise source, not normal conversation. |
| Function echo consecutive limit | 3 identical in window of 20 | Legitimate agents may call the same function twice with identical arguments (idempotency check, confirmation pattern). Three identical consecutive calls within any 20-call window is the loop signal. |
| Session rotation age | 20 minutes | At moderate conversation density (~2 turns/min), 20 minutes yields ~40 turns of accumulated history. Beyond this, the per-response input cost is material. Rotate at 20 min for cost control, or at natural conversation pauses. |
| Session rotation turn count | 50 turns | 50 turns provides a hard count-based limit for high-cadence conversations that hit the turn threshold before the time threshold. |
FAQ
Does RunGuard work with the OpenAI Realtime API Python SDK?
Yes. The guards in this post are plain Python dataclasses that operate on event dictionaries — they're not tied to any specific WebSocket client library. Whether you're using the openai Python SDK's AsyncRealtimeConnection, the websockets library directly, or any other WebSocket client, you pass the parsed event dictionary to RealtimeGuard.handle_event() and it does the rest. RunGuard's SDK wraps these guards with additional telemetry and a managed dashboard, but the pattern works standalone as shown above. Check the RunGuard homepage for the current SDK integration guide for the Realtime API specifically.
Is there a way to detect loops without interrupting the user mid-conversation?
Yes — and this is an important design constraint for voice agents specifically. The BargeinRateGuard and VADStormDetector raise exceptions at the point where your application code decides whether to call response.create, not mid-stream while the user is speaking. The FunctionEchoDetector raises after receiving the response.function_call_arguments.done event, before the tool executes. The SessionAgeGuard raises at response.done or on a pre-response check. None of these interrupt audio streaming mid-utterance. The only externally-visible effect of a tripped guard is that the next agent response is delayed or suppressed — which from the user's perspective is silence rather than an interruption. The recommended UX pattern is to play a brief audio buffer ("One moment...") when a recovery action is needed, rather than letting the silence be unexplained.
How does barge-in billing work — am I really paying for cancelled partial audio?
Yes. The OpenAI Realtime API billing model charges for audio tokens as they are generated, not as they are delivered to the client. When a user barge-in triggers response.cancel, the server stops generating new audio — but any audio tokens already produced in the current response are included in the session's usage. The response.done event (sent even for cancelled responses, with status: "cancelled") includes a usage object with the token counts for the partial response. Verify current per-token rates at openai.com/pricing; the Realtime API pricing page lists both the audio input and audio output rates separately. The practical implication is that a barge-in loop at 10 cycles/minute is not free — it generates real usage on every cycle, even though the user hears nothing useful.
Can I use RunGuard's LoopDetector for Realtime API function call loops?
RunGuard's core LoopDetector class (documented in the cost control pattern reference) handles the fingerprint-and-window pattern that underlies the FunctionEchoDetector above. The Realtime-specific adaptation in this post handles the additional complexity of parsing the raw JSON arguments string from response.function_call_arguments.done events and the wider window needed because audio turns interleave between tool calls. If you're already using RunGuard's SDK, the RealtimeLoopDetector integration extends the core LoopDetector with these Realtime-specific parsing and windowing behaviors — check the changelog for the specific SDK version that added Realtime API support.
What's a safe session duration for production voice agents?
The right session duration depends on your use case and conversation density. For high-cadence customer service agents (2–4 turns/minute), the 50-turn limit is typically hit before the 20-minute limit — plan for rotation around the 15–25 minute mark. For low-cadence assistants (one turn every few minutes), the time limit is more relevant. The key insight is that "safe" isn't just about context window limits (the Realtime API has a large context window) — it's about the compounding input token cost as history grows. A session at 40 turns deep costs roughly twice as much per response in input tokens as a fresh session, even if the model could technically handle much more context. For kiosk or phone agent deployments where sessions might run for hours, implement rotation at every natural break point (end of a task, brief silence of >30 seconds) rather than only at hard time limits.