Streaming LLM agent cost monitoring: why streaming breaks standard budget caps and how to fix it
Standard RunGuard and budget-cap implementations assume a request-response model: you make a call, the API returns a complete response with token counts in the usage header, and your budget tracker updates. Streaming breaks this assumption in two ways. First, streaming responses deliver tokens incrementally via server-sent events; the final token count is not available until the stream closes, which may be seconds or tens of seconds after the first token arrives. Second, you cannot cancel a streaming request after the first token arrives without aborting the entire connection — but you also cannot know at request initiation time how many output tokens the model will generate. The result is that a naive budget cap that checks cost after each streaming call provides weak pre-call protection (it prevents starting a call that would exceed the budget) but cannot prevent a single long streaming response from generating thousands of output tokens before it terminates naturally. For agents that use streaming for user-facing responses (real-time text display as the agent reasons) while also running tool calls in the background, this creates a split-brain cost tracking problem: tool calls are fully synchronous and trackable, streaming reasoning turns are asynchronous and only fully accountable after they close. This guide covers how to instrument streaming LLM calls for real-time cost estimation, when to terminate a stream early, and how to integrate streaming cost tracking with RunGuard’s session budget tracker.
The three cost tracking gaps in streaming LLM responses
-
Gap 1: output token count is unknown until stream closes. With non-streaming responses, the API returns
usage.completion_tokensin the response object, which you can immediately multiply by the output token rate to get exact cost. With streaming, the final token count is in amessage_stopevent at the end of the stream. If you are running a budget check after each call, you only update the budget tracker after the entire stream closes. A stream that generates 8,000 completion tokens at $15/M tokens costs $0.12 in output alone — and you discover this after all 8,000 tokens have been billed, not before. - Gap 2: mid-stream cancellation requires connection abort. Unlike non-streaming APIs where you can simply not make the next call when the budget cap fires, aborting a stream mid-way requires explicitly closing the HTTP connection. This is possible but requires care: the model provider bills for tokens already generated, so aborting at token 2,000 of a projected 8,000-token response still costs $0.03 in output tokens (2,000 × $15/M). Abortion reduces cost but does not eliminate it, and the partial response you receive may be unusable depending on where the stream was cut.
-
Gap 3: input tokens for streaming calls are billed the same as non-streaming. This is the easy part: input tokens are determined by the prompt at request time and billed regardless of output length. You can calculate the exact input cost before the stream starts (prompt tokens × input rate). The uncertainty is entirely in the output. A practical approach: estimate a maximum output cost by assuming the model will generate
max_tokensworth of output, add this to the input cost, and pre-check against the budget cap before initiating the stream. If even the worst case is within budget, start the stream; if not, reject early with a budget exceeded error.
Python: streaming cost adapter with pre-call estimation and mid-stream monitoring
-
Python: streaming wrapper with worst-case pre-check and mid-stream counter
import anthropic from runguard import guard, BudgetExceededError, LoopDetectedError from dataclasses import dataclass, field from typing import Generator # Anthropic pricing (update when rates change) SONNET_INPUT_PER_TOK = 3.0 / 1_000_000 # $3 per million input tokens SONNET_OUTPUT_PER_TOK = 15.0 / 1_000_000 # $15 per million output tokens @dataclass class StreamSession: """Tracks cost across a mix of streaming and non-streaming turns.""" cap_usd: float spent_usd: float = 0.0 loop_events: int = 0 turn_count: int = 0 def check_budget(self, additional: float) -> None: if self.spent_usd + additional > self.cap_usd: raise BudgetExceededError( f"Budget cap ${self.cap_usd} would be exceeded: " f"current ${self.spent_usd:.5f} + new ${additional:.5f}" ) def record(self, usd: float) -> None: self.spent_usd += usd self.turn_count += 1 client = anthropic.Anthropic() def estimate_stream_cost(prompt_tokens: int, max_output_tokens: int) -> float: """ Worst-case cost estimate: full max_output_tokens generated. Use this for pre-call budget check before starting a stream. """ return (prompt_tokens * SONNET_INPUT_PER_TOK + max_output_tokens * SONNET_OUTPUT_PER_TOK) def count_prompt_tokens(messages: list) -> int: """ Rough token estimator: ~1 token per 4 chars. Replace with tiktoken or the provider's token counter for accuracy. """ total_chars = sum( len(m.get("content", "") if isinstance(m.get("content", ""), str) else "") for m in messages ) return total_chars // 4 + 10 # +10 for message overhead def streaming_turn( messages: list, session: StreamSession, max_output_tokens: int = 1024, max_stream_output_tokens: int = 500, # abort if output exceeds this ) -> Generator[str, None, None]: """ Stream a single LLM turn, yielding text chunks as they arrive. Pre-checks worst-case budget before starting; tracks output tokens mid-stream; aborts stream if output exceeds max_stream_output_tokens. """ prompt_tokens = count_prompt_tokens(messages) # Pre-check: worst-case cost = full max_output_tokens generated worst_case_cost = estimate_stream_cost(prompt_tokens, max_output_tokens) session.check_budget(worst_case_cost) # raises BudgetExceededError if over cap output_tokens_seen = 0 final_input_tokens = 0 final_output_tokens = 0 with client.messages.stream( model="claude-sonnet-4-6", max_tokens=max_output_tokens, messages=messages, ) as stream: for text in stream.text_stream: # Rough mid-stream token count (4 chars ≈ 1 token) output_tokens_seen += max(1, len(text) // 4) # Mid-stream abort: if output is growing much larger than expected, # terminate early to limit cost on runaway verbose responses if output_tokens_seen > max_stream_output_tokens: stream.close() # aborts the HTTP connection # Bill for what we got: prompt + actual output so far partial_cost = (prompt_tokens * SONNET_INPUT_PER_TOK + output_tokens_seen * SONNET_OUTPUT_PER_TOK) session.record(partial_cost) yield "\n[STREAM TRUNCATED: output token limit reached]" return yield text # Stream closed normally — update with actual token counts from usage block usage = stream.get_final_message().usage final_input_tokens = usage.input_tokens final_output_tokens = usage.output_tokens actual_cost = (final_input_tokens * SONNET_INPUT_PER_TOK + final_output_tokens * SONNET_OUTPUT_PER_TOK) # Reconcile: we pre-checked worst-case; actual may be less # The budget tracker records actual, not worst-case session.record(actual_cost) def run_streaming_agent(user_query: str, cap_usd: float = 3.0) -> None: """Run a streaming agent that displays tokens in real-time.""" session = StreamSession(cap_usd=cap_usd) messages = [{"role": "user", "content": user_query}] for turn in range(10): print(f"\n--- Turn {turn + 1} (budget used: ${session.spent_usd:.4f}/{cap_usd}) ---") collected = [] try: for chunk in streaming_turn(messages, session, max_output_tokens=1024): print(chunk, end="", flush=True) collected.append(chunk) except BudgetExceededError as e: print(f"\n[BUDGET] {e}") break full_response = "".join(collected) # If no tool calls (simplified: check for tool use blocks in stream's final message) # In production, parse tool calls from the streamed event blocks break # simplified single-turn demo print(f"\n\nFinal session cost: ${session.spent_usd:.5f}")
TypeScript: streaming cost tracking with the Anthropic SDK
-
TypeScript: stream with mid-stream token counter and budget abort
import Anthropic from "@anthropic-ai/sdk"; import { BudgetExceededError } from "@runguard/sdk"; const client = new Anthropic(); const SONNET_IN = 3.0 / 1_000_000; const SONNET_OUT = 15.0 / 1_000_000; interface StreamSession { capUsd: number; spentUsd: number; turnCount: number; } function checkBudget(session: StreamSession, additional: number): void { if (session.spentUsd + additional > session.capUsd) { throw new BudgetExceededError( `Budget cap $${session.capUsd} exceeded: current $${session.spentUsd.toFixed(5)} + $${additional.toFixed(5)}` ); } } function estimatePromptTokens(messages: Anthropic.MessageParam[]): number { let chars = 0; for (const m of messages) { if (typeof m.content === "string") chars += m.content.length; } return Math.ceil(chars / 4) + 10; } async function* streamingTurn( messages: Anthropic.MessageParam[], session: StreamSession, maxOutputTokens = 1024, maxStreamOutputTokens = 500, ): AsyncGenerator{ const promptTokens = estimatePromptTokens(messages); // Pre-check worst-case cost checkBudget(session, promptTokens * SONNET_IN + maxOutputTokens * SONNET_OUT); let outputTokensSeen = 0; let finalInputTokens = 0; let finalOutputTokens = 0; const stream = client.messages.stream({ model: "claude-sonnet-4-6", max_tokens: maxOutputTokens, messages, }); for await (const event of stream) { if (event.type === "content_block_delta" && event.delta.type === "text_delta") { const text = event.delta.text; outputTokensSeen += Math.max(1, Math.ceil(text.length / 4)); if (outputTokensSeen > maxStreamOutputTokens) { stream.abort(); const partialCost = promptTokens * SONNET_IN + outputTokensSeen * SONNET_OUT; session.spentUsd += partialCost; session.turnCount++; yield "\n[STREAM TRUNCATED: output token limit reached]"; return; } yield text; } if (event.type === "message_delta" && event.usage) { finalOutputTokens = event.usage.output_tokens ?? 0; } if (event.type === "message_start" && event.message.usage) { finalInputTokens = event.message.usage.input_tokens; } } // Stream closed normally — record actual cost const actualCost = finalInputTokens * SONNET_IN + finalOutputTokens * SONNET_OUT; session.spentUsd += actualCost; session.turnCount++; } async function runStreamingAgent(query: string, capUsd = 3.0): Promise { const session: StreamSession = { capUsd, spentUsd: 0, turnCount: 0 }; const messages: Anthropic.MessageParam[] = [{ role: "user", content: query }]; console.log(`Starting streaming agent (cap: $${capUsd})`); try { process.stdout.write("\n"); for await (const chunk of streamingTurn(messages, session, 1024, 500)) { process.stdout.write(chunk); } console.log(`\n\nFinal session cost: $${session.spentUsd.toFixed(5)}`); } catch (e) { if (e instanceof BudgetExceededError) { console.log(`\n[BUDGET] ${e.message}`); } else { throw e; } } }
The worst-case pre-check before stream initiation is the most important line in both implementations. It ensures that even if the model generates the full max_tokens worth of output, the resulting cost would not push the session over its cap. This converts the streaming budget check from “we’ll see what the bill is when the stream closes” to “we know the maximum possible bill before the first token arrives.”
Streaming vs. non-streaming cost tracking comparison
| Property | Non-streaming API call | Streaming API call (naive) | Streaming with RunGuard pre-check + abort |
|---|---|---|---|
| Token count available before call completes | No — response returns after generation | No — count in final stream event | Estimated (input exact; output worst-case bounded) |
| Budget cap can fire before call | Yes — pre-call check with estimated cost | Yes — but only prevents starting the call, not limiting output length | Yes — worst-case pre-check prevents starting if even best-case would overspend |
| Budget cap can fire mid-call | No — call is atomic | No — standard budget checks run after stream closes | Yes — mid-stream token counter triggers abort at configurable threshold |
| Partial response on abort | N/A | N/A | Yes — tokens yielded up to abort point are available |
| Billing on abort | N/A | N/A | Input tokens + output tokens generated before abort |
For the full session-level cost tracking approach that streaming turns fit into, see agent observability cost dashboard. For the broader cost control architecture these streaming patterns plug into, see autonomous agent cost control best practices.
Add cost-aware streaming to your LLM agent
RunGuard installs in one command: pip install runguard for Python, npm install @runguard/sdk for TypeScript. For streaming agents, wrap the session budget in a StreamSession tracker as shown above, estimate worst-case input cost using count_prompt_tokens before each stream, set a max_stream_output_tokens abort threshold at 50–80% of your typical expected output length, and catch BudgetExceededError both on the pre-call check and from the mid-stream abort. The pre-call check handles the case where you are already near your session cap; the mid-stream abort handles the case where the model is generating unexpectedly verbose output.
RunGuard pricing: Solo plan at $19/month for individual developers. Team plan at $79/month adds Slack and PagerDuty webhook alerts, shared dashboards, and audit log. Both plans include a 14-day free trial — no credit card required.
Start your 14-day free trial — or explore related: agent observability cost dashboard, set max cost per LLM request, prevent AI agent runaway cost in real time, autonomous agent cost control best practices, and AI agent graceful degradation patterns.