Google Gemini API Cost Control: Loop Detection and Budget Enforcement in Production
Building agentic loops directly on the Google Gemini API — using google-genai (the current SDK, replacing the older google-generativeai) — gives you maximum flexibility and zero framework overhead. You control every token, every tool call, and every retry. That control comes with a cost: there are no ADK guardrails, no framework-level loop counters, no built-in circuit breakers between your code and the billing meter.
The Gemini API's function calling flow is simple to describe and expensive to get wrong. You define tools as types.FunctionDeclaration objects, pass them to client.models.generate_content(), and the model returns either a text response or one or more FunctionCall parts. You execute the called function, package the result as a FunctionResponse, and call the model again. That loop continues until the model issues a final text response. Nothing in the SDK stops the loop from running indefinitely. There is no max_turns argument on generate_content, no built-in similarity check on function arguments, no token budget enforcement, and no Slack notification when your bill crosses a threshold.
This post covers four failure modes specific to raw Gemini API agents and provides complete Python implementations for each. The guards use only the standard library plus the google-genai package — no additional dependencies. The final section explains RunGuard's managed API as an alternative to maintaining your own guard infrastructure. If you're not familiar with the general principles behind agent cost control, the AI agent cost engineering production guide is useful background before diving in.
How the Gemini API function calling loop works
The google-genai SDK (v1.0+, installed as pip install google-genai) exposes a client-based interface:
from google import genai
from google.genai import types
client = genai.Client(api_key="GEMINI_API_KEY")
tools = [
types.Tool(function_declarations=[
types.FunctionDeclaration(
name="web_search",
description="Search the web for information",
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"query": types.Schema(type=types.Type.STRING)
},
required=["query"],
),
)
])
]
config = types.GenerateContentConfig(tools=tools)
contents = [types.Content(role="user", parts=[types.Part(text="What is the current AI agent market size?")])]
while True:
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=contents,
config=config,
)
candidate = response.candidates[0]
# Check if model wants to call a function
function_calls = [p for p in candidate.content.parts if p.function_call]
if not function_calls:
# Model returned a final text response
print(candidate.content.parts[0].text)
break
# Execute each function call and append results
contents.append(candidate.content)
for fc in function_calls:
result = execute_tool(fc.function_call.name, dict(fc.function_call.args))
contents.append(types.Content(
role="user",
parts=[types.Part(function_response=types.FunctionResponse(
name=fc.function_call.name,
response={"result": result}
))]
))
The loop above is correct and complete. It is also unbounded. The model controls when it stops — and in the four failure modes below, the model does not stop cleanly.
Two key behavioral properties of the Gemini API that create cost exposure:
- Parallel function calls — Gemini can return multiple
FunctionCallparts in a single response, calling several tools simultaneously. This is a feature when it works correctly. When the model enters a search spiral in parallel mode, it multiplies the per-turn cost by the number of simultaneous calls. - Context grows with every turn — The
contentslist you pass togenerate_contentaccumulates every turn: user messages, model responses, function call records, and function response records. Per-turn input token cost rises monotonically as a session continues.
The gap: The Gemini API bills per token, not per call. Every tool call in a spiral adds the full contents list to the next request's input tokens, meaning a 10-turn spiral at 5,000 tokens per turn costs not 50,000 input tokens but roughly 1 + 2 + 3 + ... + 10 = 55 times the single-turn cost — a 10% premium that compounds faster than most teams expect.
Failure mode 1: Function call spiral
The Gemini model calls web_search with "AI agent cost control 2026", receives partial results, then calls web_search again with "AI agent cost control best practices 2026", then again with "AI agent budget enforcement techniques 2026". Each query is lexically distinct — the model genuinely believes it is making progress. The underlying search space doesn't produce a satisfying answer, so the cycle continues until your application crashes, your quota is exhausted, or your bill becomes a problem.
The detection approach is Jaccard similarity on normalized, tokenized function argument strings evaluated over a sliding window of the last four calls to each tool. When three or more pairs within that window exceed a similarity threshold of 0.72, the function is in a spiral and the guard raises before the next model call.
# gemini_spiral_guard.py
import json
import re
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
JACCARD_THRESHOLD = 0.72
WINDOW_SIZE = 4
MIN_HIGH_SIM_PAIRS = 3
def _normalize(args: dict) -> str:
"""Stable fingerprint: lowercase JSON, strip punctuation, sort tokens."""
raw = json.dumps(args, sort_keys=True)
tokens = re.sub(r'[^a-z0-9\s]', ' ', raw.lower()).split()
return ' '.join(sorted(set(tokens)))
def _jaccard(a: str, b: str) -> float:
sa, sb = set(a.split()), set(b.split())
if not sa and not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
@dataclass
class SpiralGuard:
threshold: float = JACCARD_THRESHOLD
window_size: int = WINDOW_SIZE
min_pairs: int = MIN_HIGH_SIM_PAIRS
_history: dict = field(default_factory=lambda: defaultdict(list))
def check(self, function_name: str, args: dict, session_id: str = "default") -> Optional[str]:
"""
Call before executing each Gemini function call.
Returns None if safe, or a violation message if a spiral is detected.
Instantiate one SpiralGuard per agent run — do not share across runs.
"""
key = f"{session_id}:{function_name}"
fp = _normalize(args)
window = self._history[key]
window.append(fp)
if len(window) > self.window_size:
self._history[key] = window[-self.window_size:]
window = self._history[key]
if len(window) < 2:
return None
pairs = [(i, j) for i in range(len(window)) for j in range(i + 1, len(window))]
high_sim = sum(1 for i, j in pairs if _jaccard(window[i], window[j]) >= self.threshold)
if high_sim >= self.min_pairs:
return (
f"SPIRAL_DETECTED: '{function_name}' called {len(window)} times "
f"with {high_sim} near-identical argument pairs (Jaccard ≥ {self.threshold}). "
f"Last args: {json.dumps(args)}"
)
return None
Usage wraps the inner execute_tool call in your agent loop:
spiral_guard = SpiralGuard() # one instance per agent run
for fc in function_calls:
fn_name = fc.function_call.name
fn_args = dict(fc.function_call.args)
violation = spiral_guard.check(fn_name, fn_args, session_id=session_id)
if violation:
raise RuntimeError(violation) # or return a graceful error response
result = execute_tool(fn_name, fn_args)
# ... append function response to contents
The guard is a plain Python class with no I/O — it lives entirely in memory for the lifetime of one agent run. Do not share a SpiralGuard instance across multiple concurrent agent runs; each run gets its own instance so spiral history from one run cannot contaminate another.
Failure mode 2: Chat history token accumulation
The contents list you pass to every generate_content call is the model's memory. Every turn — user message, model response with function calls, function response — gets appended. After 15 turns on a multi-step research agent, that list can represent 40,000+ tokens. Every subsequent model call sends all 40,000 tokens as input, even when most of them are no longer relevant to the current step.
The Gemini API does not raise an error when contents exceeds the model's effective context window. Instead, the model silently prioritizes recent content and discards earlier turns. You continue paying for every token in the input list, but the model behaves as if it has forgotten the beginning of the session. This leads to quality degradation (the agent re-researches things it already found) compounding the cost problem (more turns needed to reach the goal the agent lost track of).
The guard uses the Gemini API's count_tokens endpoint — which runs fast and costs nothing — to check the accumulated token count before each model call. At 70% of the model's context limit, a warning is logged. At 85%, the guard raises before spending tokens on a truncated call.
# gemini_context_guard.py
from google import genai
from google.genai import types
from typing import Optional
# Model context window limits (input tokens)
CONTEXT_LIMITS = {
"gemini-2.5-flash": 1_000_000,
"gemini-2.5-pro": 1_000_000,
"gemini-2.0-flash": 1_000_000,
"gemini-1.5-flash": 1_000_000,
"gemini-1.5-pro": 2_000_000,
}
WARN_RATIO = 0.70
HARD_STOP_RATIO = 0.85
class ContextGuard:
def __init__(self, model: str, client: genai.Client):
self.limit = CONTEXT_LIMITS.get(model, 1_000_000)
self.model = model
self.client = client
self.warn_at = int(self.limit * WARN_RATIO)
self.stop_at = int(self.limit * HARD_STOP_RATIO)
def check(self, contents: list) -> Optional[str]:
"""
Call before each generate_content call.
Returns None if within budget, a warning string at 70%,
or raises RuntimeError at 85%.
"""
count_response = self.client.models.count_tokens(
model=self.model,
contents=contents,
)
tokens = count_response.total_tokens
if tokens >= self.stop_at:
raise RuntimeError(
f"CONTEXT_LIMIT: {tokens:,} tokens in contents list "
f"({tokens/self.limit*100:.1f}% of {self.limit:,} limit). "
f"Hard stop at {HARD_STOP_RATIO*100:.0f}%. Trim contents or summarize."
)
if tokens >= self.warn_at:
return (
f"CONTEXT_WARNING: {tokens:,} tokens "
f"({tokens/self.limit*100:.1f}% of limit). "
f"Consider summarizing earlier turns."
)
return None
Usage in the agent loop, placed before each model call:
context_guard = ContextGuard(model="gemini-2.5-flash", client=client)
while True:
warning = context_guard.check(contents)
if warning:
print(f"[RunGuard] {warning}")
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=contents,
config=config,
)
# ... process response
The count_tokens call adds a few milliseconds to each turn. For agents making decisions that cost dollars per run, this is negligible. If you need to avoid any additional API calls, an approximate word-based estimator (multiply word count by 1.35) gives a usable proxy, though it undercounts structured content like JSON function call records. The count_tokens approach is exact and preferred.
Failure mode 3: Parallel function call cost multiplication
Gemini's parallel function calling is one of its most productive features for well-behaved agents — the model can call three tools simultaneously and reduce wall-clock latency by 3×. But in a degraded state, it becomes a cost multiplier. When a search-heavy agent enters a spiral while also having access to multiple similar tools (e.g., web_search, news_search, and academic_search), the model can issue three near-identical parallel calls — one to each tool — before your spiral guard's single-tool window reaches the detection threshold.
The guard tracks calls across all tools in aggregate, not just per tool, and also monitors the per-turn cost of parallel call batches. A single model turn that returns five parallel function calls is not inherently a problem, but a turn that returns five calls to semantically equivalent tools doing the same lookup is almost always a spiral in parallel mode.
# gemini_parallel_guard.py
import json
import re
from dataclasses import dataclass, field
from typing import Optional
def _normalize(args: dict) -> str:
raw = json.dumps(args, sort_keys=True)
tokens = re.sub(r'[^a-z0-9\s]', ' ', raw.lower()).split()
return ' '.join(sorted(set(tokens)))
def _jaccard(a: str, b: str) -> float:
sa, sb = set(a.split()), set(b.split())
if not sa and not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
@dataclass
class ParallelCallGuard:
max_parallel_calls: int = 5
cross_tool_sim_threshold: float = 0.80
_turn_history: list = field(default_factory=list) # list of (tool, fp) pairs per recent turn
def check_batch(self, function_calls: list) -> Optional[str]:
"""
Call with the full list of FunctionCall parts from one model response.
function_calls: list of dicts with keys 'name' and 'args'.
Returns None if safe, or a violation message.
"""
if len(function_calls) > self.max_parallel_calls:
return (
f"PARALLEL_LIMIT: {len(function_calls)} parallel function calls "
f"in one turn exceeds max {self.max_parallel_calls}. "
f"Tools: {[fc['name'] for fc in function_calls]}"
)
# Check cross-tool similarity within this batch
fps = [(fc['name'], _normalize(fc['args'])) for fc in function_calls]
for i in range(len(fps)):
for j in range(i + 1, len(fps)):
sim = _jaccard(fps[i][1], fps[j][1])
if sim >= self.cross_tool_sim_threshold:
return (
f"PARALLEL_SPIRAL: '{fps[i][0]}' and '{fps[j][0]}' "
f"called in parallel with argument similarity {sim:.2f} "
f"(threshold {self.cross_tool_sim_threshold}). "
f"Likely duplicate lookup across tools."
)
# Check if this batch is nearly identical to the previous turn's batch
if self._turn_history:
prev_fps = [fp for _, fp in self._turn_history]
curr_fps = [fp for _, fp in fps]
cross_sim_count = sum(
1 for p in prev_fps for c in curr_fps
if _jaccard(p, c) >= self.cross_tool_sim_threshold
)
if cross_sim_count >= min(len(prev_fps), len(curr_fps)):
return (
f"TURN_REPETITION: Current parallel batch is near-identical "
f"to previous turn's batch ({cross_sim_count} matching pairs). "
f"Agent is repeating the same multi-tool lookup."
)
self._turn_history = fps
return None
Usage wraps the per-turn function call list before any tool execution:
parallel_guard = ParallelCallGuard()
if function_calls:
batch = [{"name": fc.function_call.name, "args": dict(fc.function_call.args)}
for fc in function_calls]
violation = parallel_guard.check_batch(batch)
if violation:
raise RuntimeError(violation)
# proceed to execute each function call
Failure mode 4: Retry cascade amplification
The google-genai SDK performs automatic retries on transient errors: 429 (quota exceeded), 503 (service unavailable), and certain 500 errors. By default, it retries up to 3 times with exponential backoff. If your application code also implements retries — a reasonable pattern for production resilience — the two retry layers multiply: 3 SDK retries × 3 application retries = up to 9 actual API calls per logical request. On quota-limited accounts with a busy agent, this can cause a single failing turn to exhaust the remaining quota entirely.
A circuit breaker at the application layer opens after a configurable number of consecutive turn failures and stays open for a cooldown period. This prevents the application retry layer from compounding SDK retries during a quota-hit or service degradation event.
# gemini_circuit_breaker.py
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class GeminiCircuitBreaker:
failure_threshold: int = 2
cooldown_seconds: int = 60
_consecutive_failures: int = field(default=0, init=False)
_open_until: float = field(default=0.0, init=False)
def check(self) -> Optional[str]:
"""
Call before each generate_content call.
Returns None if closed (safe to call), or a message if open.
"""
if time.time() < self._open_until:
remaining = int(self._open_until - time.time())
return (
f"CIRCUIT_OPEN: breaker open for {remaining}s more. "
f"Do not retry until cooldown expires."
)
return None
def record_success(self):
"""Call after a successful generate_content response."""
self._consecutive_failures = 0
self._open_until = 0.0
def record_failure(self, error: Exception) -> Optional[str]:
"""
Call when generate_content raises. Returns a trip message if breaker opens.
Instantiate one GeminiCircuitBreaker per agent run.
"""
self._consecutive_failures += 1
if self._consecutive_failures >= self.failure_threshold:
self._open_until = time.time() + self.cooldown_seconds
return (
f"CIRCUIT_TRIPPED: {self._consecutive_failures} consecutive "
f"failures (last: {type(error).__name__}: {error}). "
f"Breaker open for {self.cooldown_seconds}s. "
f"Do not re-invoke this agent run — start a new session."
)
return None
Full integration pattern with all four guards in the same agent loop:
from google import genai
from google.genai import types
from gemini_spiral_guard import SpiralGuard
from gemini_context_guard import ContextGuard
from gemini_parallel_guard import ParallelCallGuard
from gemini_circuit_breaker import GeminiCircuitBreaker
client = genai.Client(api_key="GEMINI_API_KEY")
model = "gemini-2.5-flash"
# One instance per agent run — never share across runs
spiral_guard = SpiralGuard()
context_guard = ContextGuard(model=model, client=client)
parallel_guard = ParallelCallGuard()
circuit_breaker = GeminiCircuitBreaker()
contents = [types.Content(role="user", parts=[types.Part(text=user_prompt)])]
while True:
# 1. Circuit breaker check
cb_block = circuit_breaker.check()
if cb_block:
print(f"[RunGuard] {cb_block}")
break
# 2. Context accumulation check
ctx_warning = context_guard.check(contents)
if ctx_warning:
print(f"[RunGuard] {ctx_warning}")
try:
response = client.models.generate_content(
model=model,
contents=contents,
config=config,
)
circuit_breaker.record_success()
except Exception as e:
trip_msg = circuit_breaker.record_failure(e)
if trip_msg:
print(f"[RunGuard] {trip_msg}")
break
raise
candidate = response.candidates[0]
function_calls = [p for p in candidate.content.parts if p.function_call]
if not function_calls:
print(candidate.content.parts[0].text)
break
# 3. Parallel call guard
if function_calls:
batch = [{"name": p.function_call.name, "args": dict(p.function_call.args)}
for p in function_calls]
parallel_violation = parallel_guard.check_batch(batch)
if parallel_violation:
print(f"[RunGuard] {parallel_violation}")
break
# 4. Spiral check per call + execute
contents.append(candidate.content)
fn_results = []
for fc_part in function_calls:
fn_name = fc_part.function_call.name
fn_args = dict(fc_part.function_call.args)
spiral_violation = spiral_guard.check(fn_name, fn_args)
if spiral_violation:
print(f"[RunGuard] {spiral_violation}")
break
result = execute_tool(fn_name, fn_args)
fn_results.append((fn_name, result))
else:
# Only continue if no spiral detected
for fn_name, result in fn_results:
contents.append(types.Content(
role="user",
parts=[types.Part(function_response=types.FunctionResponse(
name=fn_name,
response={"result": result}
))]
))
continue
break # spiral detected — exit loop
Adding a budget ceiling
The four guards above prevent runaway loops. A fifth, optional layer enforces a hard dollar ceiling per agent run. Gemini pricing is per million tokens; the guard tracks cumulative input and output tokens across all turns and raises when a configurable USD ceiling is hit.
# gemini_budget_guard.py
from dataclasses import dataclass, field
from typing import Optional
# Gemini 2.5 Flash pricing (USD per 1M tokens) — update as pricing changes
GEMINI_PRICING = {
"gemini-2.5-flash": {"input": 0.30, "output": 2.50},
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
"gemini-2.0-flash": {"input": 0.10, "output": 0.40},
"gemini-1.5-flash": {"input": 0.075, "output": 0.30},
"gemini-1.5-pro": {"input": 1.25, "output": 5.00},
}
@dataclass
class BudgetGuard:
model: str
ceiling_usd: float = 1.00
_total_cost: float = field(default=0.0, init=False)
_total_input_tokens: int = field(default=0, init=False)
_total_output_tokens: int = field(default=0, init=False)
def record(self, response) -> Optional[str]:
"""
Call after each successful generate_content response.
Reads token counts from response.usage_metadata.
Returns a budget violation message if ceiling is breached.
"""
pricing = GEMINI_PRICING.get(self.model, {"input": 1.25, "output": 5.00})
meta = response.usage_metadata
in_tokens = getattr(meta, 'prompt_token_count', 0) or 0
out_tokens = getattr(meta, 'candidates_token_count', 0) or 0
turn_cost = (
(in_tokens / 1_000_000) * pricing["input"] +
(out_tokens / 1_000_000) * pricing["output"]
)
self._total_cost += turn_cost
self._total_input_tokens += in_tokens
self._total_output_tokens += out_tokens
if self._total_cost >= self.ceiling_usd:
return (
f"BUDGET_EXCEEDED: ${self._total_cost:.4f} spent "
f"({self._total_input_tokens:,} input + {self._total_output_tokens:,} output tokens) "
f"exceeds ${self.ceiling_usd:.2f} ceiling. Stopping run."
)
return None
The five guards (spiral, context, parallel, circuit breaker, budget) form a complete cost control layer for raw Gemini API agents. The application order recommended in the pattern reference is: budget check → circuit breaker check → parallel call check → spiral check → context check → record spend. Budget and circuit breaker checks are fast (no I/O); run them before spending tokens on a count_tokens call.
Handling the older google-generativeai SDK
If your codebase still uses the older google-generativeai package (import path import google.generativeai as genai), the guard logic is identical — only the API surface differs. The key differences:
- Chat sessions use
model.start_chat()andchat.send_message()instead of the statelessclient.models.generate_content()loop. Thechat.historyattribute holds the accumulated contents list. - Function calls appear as
response.candidates[0].content.partswithpart.function_callpopulated, same as the new SDK. - Token counting uses
model.count_tokens(chat.history)instead ofclient.models.count_tokens(). - Usage metadata is at
response.usage_metadata.prompt_token_countandresponse.usage_metadata.candidates_token_count, same field names.
The guard classes above work with either SDK — the only integration change is which object you pass to context_guard.check() (pass chat.history for the old SDK, the contents list for the new one) and where you read usage metadata from the response object.
RunGuard managed API integration
The guards above require you to instantiate, wire, and maintain five Python classes per agent entry point. RunGuard's managed API replaces all five with a single HTTP POST before each tool call and a single POST after each model response. Your code remains readable; the guard state lives in RunGuard's infrastructure; you get a Slack alert and a dashboard entry when any breaker trips.
# RunGuard managed API — drop-in for raw Gemini API agents
import httpx
RUNGUARD_API_KEY = "rg_live_..."
RUNGUARD_URL = "https://api.runguard.dev/v1/check"
RUN_ID = "gemini-research-agent-run-abc123" # unique per agent run
def rg_check(event_type: str, payload: dict) -> None:
"""Raises RuntimeError if RunGuard determines the run should stop."""
resp = httpx.post(
RUNGUARD_URL,
json={"run_id": RUN_ID, "event": event_type, "data": payload},
headers={"Authorization": f"Bearer {RUNGUARD_API_KEY}"},
timeout=2.0,
)
if resp.status_code == 200 and resp.json().get("action") == "stop":
raise RuntimeError(f"[RunGuard] {resp.json()['reason']}")
# Before each tool call:
rg_check("tool_call", {"tool": fn_name, "args": fn_args, "model": model})
# After each generate_content response:
rg_check("model_response", {
"input_tokens": response.usage_metadata.prompt_token_count,
"output_tokens": response.usage_metadata.candidates_token_count,
"model": model,
})
RunGuard's API runs under 2ms p99 at the guard endpoint and fails open on network error — your agent continues even if RunGuard's API is unreachable, and the missed check is logged for review.
Guard application order and thresholds
| Guard | When to apply | Default threshold | Tuning signal |
|---|---|---|---|
BudgetGuardrecord(response) |
After each model response | $1.00 per run | Raise until false positives drop below 1% of runs |
GeminiCircuitBreakercheck() / record_failure() |
Before and after each model call | 2 consecutive failures, 60s cooldown | Lower threshold for external-API-heavy agents; raise for slow-network environments |
ParallelCallGuardcheck_batch(batch) |
After model response, before tool execution | 5 max parallel calls, 0.80 cross-tool sim | Lower sim threshold if tools are near-synonymous |
SpiralGuardcheck(name, args) |
Before each individual tool execution | Jaccard 0.72, window 4, min 3 pairs | Lower Jaccard for narrow-vocabulary domains; see pattern reference for calibration |
ContextGuardcheck(contents) |
Before each model call | Warn 70%, stop 85% of model limit | Lower stop threshold for cost-critical deployments; raise for multi-step planning agents that legitimately need deep context |
TypeScript (google/generative-ai) equivalents
If you're using the TypeScript @google/generative-ai or @google/genai package, the guard patterns translate directly. The spiral guard becomes a class with a Map<string, string[]> history, the context guard uses model.countTokens(), and the circuit breaker and budget guard are straightforward port-for-port equivalents. The key integration point — wrapping each function call dispatch and each generateContent call — is the same in TypeScript.
// TypeScript spiral guard — abbreviated
class SpiralGuard {
private history = new Map<string, string[]>();
private readonly threshold = 0.72;
private readonly windowSize = 4;
private readonly minPairs = 3;
check(toolName: string, args: Record<string, unknown>): string | null {
const fp = this.normalize(args);
const key = toolName;
const window = this.history.get(key) ?? [];
window.push(fp);
if (window.length > this.windowSize) window.splice(0, window.length - this.windowSize);
this.history.set(key, window);
if (window.length < 2) return null;
let highSimPairs = 0;
for (let i = 0; i < window.length; i++) {
for (let j = i + 1; j < window.length; j++) {
if (this.jaccard(window[i], window[j]) >= this.threshold) highSimPairs++;
}
}
return highSimPairs >= this.minPairs
? `SPIRAL_DETECTED: '${toolName}' called with ${highSimPairs} near-identical arg pairs`
: null;
}
private normalize(args: Record<string, unknown>): string {
const raw = JSON.stringify(args, Object.keys(args).sort());
const tokens = raw.toLowerCase().replace(/[^a-z0-9\s]/g, ' ').split(/\s+/).filter(Boolean);
return [...new Set(tokens)].sort().join(' ');
}
private jaccard(a: string, b: string): number {
const sa = new Set(a.split(' '));
const sb = new Set(b.split(' '));
const intersection = new Set([...sa].filter(x => sb.has(x)));
const union = new Set([...sa, ...sb]);
return union.size === 0 ? 0 : intersection.size / union.size;
}
}
Frequently asked questions
Gemini 2.5 has a 1M token context window — does context accumulation really matter at that scale?
Yes, for two reasons. First, you pay for every token in the input list on every call, so a 200,000-token contents list on a 20-turn agent means you've paid for roughly 2,100,000 input tokens total (1+2+3+...+20 proportionally), not 200,000. Second, Gemini 2.5's advertised limit is the maximum the model will accept — it is not a guarantee of quality across the full range. Empirically, factual recall degrades significantly past 500,000 tokens of dense tool call history, because the model's attention is diluted across a massive context. The ContextGuard's 85% hard stop at 850,000 tokens is a cost guard and a quality guard.
Does Gemini have its own safety stop that would prevent truly runaway loops?
The API will reject requests that exceed the model's maximum token limit, and per-minute / per-day quota limits will eventually throttle or block your account. These are billing-adjacent controls, not cost controls — by the time the API refuses a call for quota exhaustion, you've already spent the money that triggered the limit. There is no API-level loop detection, and quota refusals trigger SDK retries that can themselves compound costs. Your application code is responsible for stopping runaway loops before they reach quota.
How does this interact with Gemini's automatic function calling feature?
The google-genai SDK supports an automatic_function_calling mode where you pass Python callables directly and the SDK manages the tool-call loop automatically. This is convenient but removes the inspection point where you'd place guards — the SDK calls your functions without any hook between model response and tool execution. For production agents where cost control matters, manual function calling (the loop pattern shown above) is recommended so you can wire guards at each turn. If you're using automatic function calling, RunGuard's managed API with a webhook interceptor is the appropriate integration path.
The Jaccard 0.72 threshold seems arbitrary. How do I calibrate it for my tool set?
Start by logging normalized fingerprints for your tools in a staging environment without the guard active. For each pair of consecutive calls to the same tool, compute the Jaccard score and record them. Legitimate consecutive calls to a well-behaved agent typically score below 0.45; genuine spirals score above 0.75. The 0.72 threshold sits in a gap that exists in most search-oriented agents. For tools with narrow vocabulary (medical coding lookups, stock ticker queries), legitimate consecutive calls can score higher — lower the threshold to 0.65. For free-text tools (essay generation, summarization), legitimate calls are maximally distinct — raise the threshold to 0.80. The AI agent cost control pattern reference covers threshold calibration methodology in depth.
Can I use these guards with Gemini's streaming mode?
Yes, with one adjustment: streaming responses deliver token counts after the stream completes, not upfront. For the ContextGuard, call count_tokens(contents) before starting the stream (checks the input side). For the BudgetGuard, collect the final usage_metadata from the last chunk in the stream — the SDK populates it on the final chunk. For the spiral and circuit breaker guards, there is no stream-specific change: you check before initiating the call and record the result after the stream ends.