Chainlit Cost Control and Loop Detection: Managing Agent Spend in Conversational AI Apps
Chainlit has become the go-to Python framework for building production conversational AI interfaces, with 22k+ GitHub stars and a design philosophy that lets you drop a chat UI in front of LangChain, LlamaIndex, LangGraph, or a raw LLM API in under 50 lines of Python. The framework handles WebSocket streaming, step tracking, session management, and file uploads — the full scaffolding of a conversational AI product — while keeping your agent logic free of UI concerns.
The cost control challenges in Chainlit are distinct from those in raw agent frameworks, because Chainlit introduces a layer of session and step management that shapes how token costs accumulate in ways that aren't always obvious. Unlike calling the LLM directly, where you count tokens per call, Chainlit sessions maintain persistent context across multiple turns, and its step-tracking model can cause a single user message to trigger dozens of nested LLM calls through sub-agents and tools — each billed separately, none of them visible in the Chainlit UI as a cost metric. A looping sub-agent triggered by a single user message can silently consume hundreds of dollars before the handler returns.
Four structural patterns amplify LLM costs in Chainlit apps:
- Conversation history accumulation — Chainlit's
cl.chat_context.get()returns the full message history, and agent framework callbacks (LangChain'sAsyncLangchainCallbackHandler, LlamaIndex'sLlamaIndexCallbackHandler) pass this history to the LLM on every turn; costs grow quadratically with conversation length. - Nested step explosion —
cl.Stepcontext managers nest arbitrarily deep; a looping sub-agent within a step creates new child steps recursively, each making LLM calls, before the top-level handler returns a single message to the user. - Async tool call budget overflow — Chainlit's async execution model lets tool-calling agents run in the background without interruption; there is no default ceiling on the number of tool calls per user message, so a looping agent will call tools indefinitely while streaming innocuous progress updates to the UI.
- Session context drift and silent retry — Chainlit persists session state in
cl.user_sessionacross WebSocket reconnects; if state is not carefully scoped, a reconnected client can replay the triggering message and re-execute the agent with stale accumulated context.
Chainlit's token cost model
Chainlit does not charge for LLM calls — it's open source, and your LLM provider bills you directly. But the framework shapes cost in two critical ways: through its session model and through its integration callbacks.
Every Chainlit app session is stateful. The session starts when a user connects via WebSocket and ends when they disconnect or the session times out. Within a session, every message the user sends and every response the assistant generates is stored in Chainlit's in-memory message store and accessible via cl.chat_context.get(). This is convenient for maintaining conversation flow but dangerous for cost: by turn 20 in a rich conversation with tool outputs, the history may contain 40,000–80,000 tokens. If the agent framework callback sends the full history to the LLM on every turn — which is the default behavior for both LangChain and LlamaIndex callbacks — turn 20 costs 20–40× more in input tokens than turn 1, simply because the context accumulated silently in the background.
The callbacks are the second lever. Chainlit provides native callback handlers that intercept LangChain chains and LlamaIndex queries and display their intermediate steps in the Chainlit UI. These callbacks are extremely useful for observability — users can see the agent's reasoning and tool calls in real time. But the callbacks also serialize the message history into the LLM call: the LangChain callback handler converts cl.chat_context.get() into LangChain's HumanMessage/AIMessage format and injects it as the conversation history. There is no token budget check in this path. If you use AsyncLangchainCallbackHandler and your LangChain agent runs a 20-step ReAct loop with full history included on each step, you pay for 20 LLM calls × the full conversation history length in input tokens each time.
The practical implication: in Chainlit apps, the cost of a single user message is not determined by the user's message length — it's determined by the session's accumulated history length multiplied by the number of LLM calls the agent makes per turn. A conversation that looks like a simple Q&A session in the UI might be running a 10-step agent loop on every message, paying for the entire 50,000-token history at each step.
Failure mode 1: conversation history accumulation
Chainlit's default session model makes conversation history accumulation invisible. The UI shows the user a clean chat interface. The developer sees well-formatted step outputs. The LLM bill shows a gradual increase that is easy to attribute to "more usage" rather than to the specific pattern: each additional conversation turn multiplies the input token cost of every subsequent turn because the full history is included.
The accumulation pattern is straightforward to reproduce. A user session starts with a 2,000-token system prompt. The first user message adds 50 tokens; the assistant's first response adds 200 tokens. By turn 10, the history is roughly 2,000 + (10 × 250) = 4,500 tokens per call. But if the agent makes five LLM calls per user turn (ReAct steps), the actual input token cost per user message is 5 × 4,500 = 22,500 tokens. By turn 30, the history is 9,500 tokens, and the per-message input cost is 5 × 9,500 = 47,500 tokens — double the turn-10 cost even though the user is asking similarly-sized questions.
The guard intercepts the message history at the Chainlit layer, before it reaches the agent framework, and enforces a rolling token budget for the context window passed to the LLM.
import chainlit as cl
from dataclasses import dataclass, field
from typing import Any
import tiktoken
# Token counting uses cl-4o's tokenizer as a conservative estimate for most modern LLMs.
_encoder = tiktoken.get_encoding("cl100k_base")
def count_tokens(text: str) -> int:
return len(_encoder.encode(text))
@dataclass
class SessionTokenBudget:
max_history_tokens: int = 16_000 # hard cap on tokens sent to LLM from history
warn_at_tokens: int = 12_000 # log a warning but continue
system_prompt_tokens: int = 2_000 # reserved for system prompt (not in history)
total_turns: int = 0
total_input_tokens_billed: int = 0
total_llm_calls: int = 0
def get_session_budget() -> SessionTokenBudget:
budget = cl.user_session.get("token_budget")
if budget is None:
budget = SessionTokenBudget()
cl.user_session.set("token_budget", budget)
return budget
def truncate_history_to_budget(
messages: list[dict],
max_tokens: int,
system_prompt: str = "",
) -> list[dict]:
"""
Truncates the message history to fit within max_tokens.
Keeps the most recent messages, always preserving the first message (initial task context).
Returns a possibly-shortened list of messages with a synthetic note when truncated.
"""
system_tokens = count_tokens(system_prompt)
available = max_tokens - system_tokens
if available <= 0:
return []
# Count tokens from the end, building the window we can afford
window: list[dict] = []
token_total = 0
truncated = False
for msg in reversed(messages):
content = msg.get("content", "") or ""
msg_tokens = count_tokens(content) + 4 # role overhead
if token_total + msg_tokens > available:
truncated = True
break
window.append(msg)
token_total += msg_tokens
window.reverse()
if truncated:
# Prepend a synthetic system message explaining the truncation
window.insert(0, {
"role": "system",
"content": (
f"[RunGuard] Conversation history truncated to the most recent "
f"{len(window)} messages to stay within the {max_tokens}-token context budget. "
"Earlier messages have been omitted. Respond based on the available context."
),
})
return window
def build_guarded_messages(
system_prompt: str,
user_message: str,
) -> list[dict]:
"""
Builds a token-budgeted message list for the current Chainlit session.
Call this in your @cl.on_message handler instead of assembling history manually.
Returns the truncated history + new user message, ready to pass to your LLM.
"""
budget = get_session_budget()
budget.total_turns += 1
# Pull the full Chainlit message history
raw_history = cl.chat_context.get()
history_dicts = [
{
"role": "user" if msg.author == cl.user_session.get("user", {}).get("identifier", "User") else "assistant",
"content": msg.content or "",
}
for msg in raw_history
if msg.content # skip empty messages and file uploads without text
]
truncated = truncate_history_to_budget(
history_dicts,
max_tokens=budget.max_history_tokens,
system_prompt=system_prompt,
)
# Log a warning when approaching the cap
total_tokens_estimate = sum(count_tokens(m.get("content", "")) for m in truncated)
if total_tokens_estimate >= budget.warn_at_tokens:
print(
f"[RunGuard] Session '{cl.context.session.id}' history at "
f"{total_tokens_estimate} tokens (warn threshold: {budget.warn_at_tokens}). "
"Consider summarizing old context or starting a new session."
)
# Append the current user message
truncated.append({"role": "user", "content": user_message})
# Track billed token estimate
budget.total_input_tokens_billed += total_tokens_estimate + count_tokens(user_message)
cl.user_session.set("token_budget", budget)
return truncated
@cl.on_message
async def on_message(message: cl.Message):
system_prompt = "You are a helpful assistant."
messages = build_guarded_messages(system_prompt, message.content)
# Pass messages to your LLM (OpenAI example)
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "system", "content": system_prompt}] + messages,
stream=True,
)
answer = cl.Message(content="")
async for chunk in response:
token = chunk.choices[0].delta.content or ""
await answer.stream_token(token)
await answer.send()
budget = get_session_budget()
budget.total_llm_calls += 1
cl.user_session.set("token_budget", budget)
The truncate_history_to_budget() function builds the context window from the tail of the conversation history — preserving the most recent messages, which are most relevant to the current turn. When it truncates, it inserts a synthetic system message explaining the truncation so the LLM does not hallucinate context that is no longer present. The guard operates at the Chainlit layer, before the history reaches the LangChain or LlamaIndex callback, which means it intercepts accumulation regardless of which agent framework you're using downstream.
The budget.total_input_tokens_billed tracker in cl.user_session persists across turns and lets you build a per-session cost dashboard: total_input_tokens_billed × price_per_token gives the lower-bound cost for the session (excluding output tokens and agent sub-calls, which the per-turn guards below handle). For production apps with per-user billing, store this in your database alongside the session ID.
Failure mode 2: nested step explosion
Chainlit's cl.Step context manager is designed to display agent reasoning in the UI as a collapsible step tree. A top-level step might be "Answering your question," containing a child step "Searching the web," containing a grandchild step "Parsing search results." This visual model maps naturally to agent frameworks: LangChain's AgentExecutor becomes a step, each tool call becomes a child step, and the tool's sub-calls (if the tool itself invokes an LLM) become grandchild steps.
The explosion happens when the agent at any level of the step tree enters a loop. Consider a LangGraph sub-graph that is triggered by a tool call within a top-level Chainlit step. If that sub-graph enters a retry loop — re-calling the same tool because each tool call returns an error or an insufficient result — it generates one new cl.Step per loop iteration. At 50 iterations, the step tree has 50 grandchild nodes, each associated with one LLM call and one tool call. The top-level Chainlit message streams intermediate updates continuously; the user sees progress. The LLM bill accumulates silently.
The nested step guard wraps cl.Step to enforce depth and breadth limits per user turn.
import chainlit as cl
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from runguard import BudgetTracker, BudgetExceededError
@dataclass
class TurnStepTracker:
"""Tracks step count and nesting depth for a single user message turn."""
step_count: int = 0
max_depth_seen: int = 0
current_depth: int = 0
llm_call_count: int = 0
tool_call_count: int = 0
tripped: bool = False
trip_reason: str = ""
# Budget limits
max_steps_per_turn: int = 40
max_depth: int = 6
max_llm_calls_per_turn: int = 20
max_tool_calls_per_turn: int = 30
def get_turn_tracker(
max_steps: int = 40,
max_depth: int = 6,
max_llm_calls: int = 20,
max_tool_calls: int = 30,
) -> TurnStepTracker:
"""Gets or creates a fresh TurnStepTracker for the current turn in cl.user_session."""
tracker = cl.user_session.get("turn_tracker")
if tracker is None or tracker.tripped:
tracker = TurnStepTracker(
max_steps_per_turn=max_steps,
max_depth=max_depth,
max_llm_calls_per_turn=max_llm_calls,
max_tool_calls_per_turn=max_tool_calls,
)
cl.user_session.set("turn_tracker", tracker)
return tracker
def reset_turn_tracker():
"""Reset at the start of each on_message handler."""
cl.user_session.set("turn_tracker", None)
@asynccontextmanager
async def guarded_step(
name: str,
step_type: str = "tool",
is_llm_call: bool = False,
is_tool_call: bool = False,
):
"""
Async context manager that wraps cl.Step with a budget guard.
Raises RuntimeError before creating the step if any budget is exceeded.
Usage:
async with guarded_step("Web search", step_type="tool", is_tool_call=True):
result = await search_web(query)
"""
tracker = get_turn_tracker()
if tracker.tripped:
raise RuntimeError(
f"[RunGuard] Turn circuit open: {tracker.trip_reason}. "
"Refusing to create new step. Agent must stop."
)
# Depth check
if tracker.current_depth >= tracker.max_depth:
tracker.tripped = True
tracker.trip_reason = (
f"step nesting depth {tracker.current_depth} exceeded max {tracker.max_depth}"
)
raise RuntimeError(
f"[RunGuard] Step depth circuit tripped: nesting level {tracker.current_depth} "
f"exceeds maximum {tracker.max_depth}. "
"Sub-agent is recursing deeper than expected — check for recursive tool calls."
)
# Total step count check
tracker.step_count += 1
if tracker.step_count > tracker.max_steps_per_turn:
tracker.tripped = True
tracker.trip_reason = (
f"total steps per turn exceeded {tracker.max_steps_per_turn}"
)
raise RuntimeError(
f"[RunGuard] Turn step count circuit tripped: {tracker.step_count} steps "
f"in this turn (max: {tracker.max_steps_per_turn}). "
"Agent is looping — creating too many tool/LLM steps without completing the task."
)
# LLM call count check
if is_llm_call:
tracker.llm_call_count += 1
if tracker.llm_call_count > tracker.max_llm_calls_per_turn:
tracker.tripped = True
tracker.trip_reason = (
f"LLM calls per turn exceeded {tracker.max_llm_calls_per_turn}"
)
raise RuntimeError(
f"[RunGuard] LLM call circuit tripped: {tracker.llm_call_count} LLM calls "
f"in this turn (max: {tracker.max_llm_calls_per_turn}). "
"Agent is making too many reasoning steps — likely stuck in a retry or planning loop."
)
# Tool call count check
if is_tool_call:
tracker.tool_call_count += 1
if tracker.tool_call_count > tracker.max_tool_calls_per_turn:
tracker.tripped = True
tracker.trip_reason = (
f"tool calls per turn exceeded {tracker.max_tool_calls_per_turn}"
)
raise RuntimeError(
f"[RunGuard] Tool call circuit tripped: {tracker.tool_call_count} tool calls "
f"in this turn (max: {tracker.max_tool_calls_per_turn}). "
"Agent is calling tools without making progress — check for tool output parsing failures."
)
tracker.current_depth += 1
tracker.max_depth_seen = max(tracker.max_depth_seen, tracker.current_depth)
cl.user_session.set("turn_tracker", tracker)
async with cl.Step(name=name, type=step_type) as step:
try:
yield step
finally:
tracker.current_depth -= 1
cl.user_session.set("turn_tracker", tracker)
# Example: using guarded_step in a Chainlit message handler with a LangChain-style agent
@cl.on_message
async def on_message_with_step_guard(message: cl.Message):
reset_turn_tracker() # always reset at the start of each turn
try:
async with guarded_step("Planning", step_type="run", is_llm_call=True):
plan = await call_planner_llm(message.content)
for task in plan.tasks:
async with guarded_step(f"Executing: {task.name}", step_type="tool", is_tool_call=True):
if task.requires_search:
async with guarded_step("Web search", step_type="tool", is_tool_call=True):
search_result = await run_web_search(task.query)
async with guarded_step("Synthesizing result", step_type="llm", is_llm_call=True):
task_result = await synthesize(task, search_result)
answer = await finalize_answer(plan)
await cl.Message(content=answer).send()
except RuntimeError as e:
if "[RunGuard]" in str(e):
tracker = get_turn_tracker()
await cl.Message(
content=(
"I hit a complexity limit while working on your request. "
f"The agent made {tracker.step_count} steps and {tracker.tool_call_count} "
"tool calls without completing the task — this usually means the request "
"needs to be broken into smaller parts."
)
).send()
else:
raise
The guard's depth tracking (current_depth) uses Python's context manager stack to maintain the nesting level accurately across async boundaries. The breadth tracking (step_count) is global to the turn — it counts every step regardless of depth, preventing a flat loop (many steps at depth 1) that circumvents a depth-only check. Both limits are configurable per application: a simple Q&A agent might set max_steps_per_turn=10 and max_llm_calls_per_turn=5, while a complex research agent might allow 40 steps and 20 LLM calls before tripping.
The guard converts circuit trips into user-facing messages rather than unhandled exceptions. Users see a descriptive explanation ("I hit a complexity limit") rather than a 500 error, and the per-turn counters in the message give actionable context for debugging: "the agent made 38 tool calls" is immediately useful for identifying which tool is looping.
Failure mode 3: async tool call budget overflow
Chainlit's async execution model is one of its best features for UX: long-running agent steps execute in the background, streaming progress updates to the user in real time via WebSocket, while the main event loop stays responsive. The same property creates a cost blind spot: there is no mechanism in Chainlit's async model to interrupt an in-progress agent coroutine from outside. Once @cl.on_message starts an agent executor, that executor runs until it completes or raises — there is no timeout at the Chainlit layer, and no built-in tool call count limit.
The overflow pattern is most common with LangChain's AgentExecutor and LangGraph agents integrated via Chainlit's callback handler. The executor's max_iterations parameter is the only default safety valve — and many integrations leave it at the default of 15, which becomes a ceiling only if every iteration makes exactly one tool call. Agents that make multiple tool calls per reasoning step, or that call a sub-agent that itself calls tools, can exceed 15 effective tool calls per turn without hitting max_iterations.
The guard wraps the agent executor coroutine with an asyncio.wait_for timeout and a token/cost budget enforced via RunGuard's BudgetTracker.
import asyncio
import time
import chainlit as cl
from dataclasses import dataclass
from runguard import BudgetTracker, BudgetExceededError
@dataclass
class TurnBudget:
"""Per-user-message budget enforced in Chainlit's async handler."""
max_wall_seconds: float = 120.0 # trip if a single turn takes > 2 minutes
max_cost_usd: float = 0.50 # trip if a single turn exceeds $0.50
max_tool_calls: int = 25 # hard tool call ceiling per turn
tool_calls_made: int = 0
cost_usd_spent: float = 0.0
started_at: float = 0.0
tripped: bool = False
def start(self):
self.started_at = time.monotonic()
def elapsed(self) -> float:
return time.monotonic() - self.started_at
def check(self) -> None:
"""Raises BudgetExceededError if any limit is exceeded. Call this after each tool call."""
if self.tripped:
raise BudgetExceededError("turn budget already tripped")
if self.elapsed() > self.max_wall_seconds:
self.tripped = True
raise BudgetExceededError(
f"[RunGuard] Turn time limit exceeded: {self.elapsed():.0f}s "
f"(max: {self.max_wall_seconds:.0f}s). "
"Agent task is taking too long — check for tool timeouts or planning loops."
)
if self.cost_usd_spent > self.max_cost_usd:
self.tripped = True
raise BudgetExceededError(
f"[RunGuard] Turn cost limit exceeded: ${self.cost_usd_spent:.3f} "
f"(max: ${self.max_cost_usd:.2f}). "
"This agent turn is too expensive — break the task into smaller steps."
)
if self.tool_calls_made > self.max_tool_calls:
self.tripped = True
raise BudgetExceededError(
f"[RunGuard] Turn tool call limit exceeded: {self.tool_calls_made} calls "
f"(max: {self.max_tool_calls}). "
"Agent is calling tools without converging — likely a loop."
)
class ChainlitBudgetedAgent:
"""
Wraps an async agent callable with a per-turn budget.
Integrates with Chainlit's user_session for per-user turn tracking
and sends budget-exceeded messages back to the Chainlit UI.
"""
# Approximate costs per 1M tokens (adjust for your model)
INPUT_COST_PER_1M = 2.50 # GPT-4o input
OUTPUT_COST_PER_1M = 10.00 # GPT-4o output
def __init__(
self,
agent_callable,
max_seconds: float = 120.0,
max_cost_usd: float = 0.50,
max_tool_calls: int = 25,
):
self.agent = agent_callable
self.max_seconds = max_seconds
self.max_cost_usd = max_cost_usd
self.max_tool_calls = max_tool_calls
def _estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
return (
input_tokens / 1_000_000 * self.INPUT_COST_PER_1M
+ output_tokens / 1_000_000 * self.OUTPUT_COST_PER_1M
)
async def run(self, user_input: str, **kwargs) -> str:
"""
Runs the agent with budget enforcement.
Returns the agent's final answer, or a budget-exceeded explanation.
"""
budget = TurnBudget(
max_wall_seconds=self.max_seconds,
max_cost_usd=self.max_cost_usd,
max_tool_calls=self.max_tool_calls,
)
budget.start()
cl.user_session.set("current_turn_budget", budget)
try:
result = await asyncio.wait_for(
self.agent(user_input, budget=budget, **kwargs),
timeout=self.max_seconds,
)
return result
except asyncio.TimeoutError:
budget.tripped = True
elapsed = budget.elapsed()
return (
f"I ran out of time working on your request ({elapsed:.0f}s elapsed, "
f"limit: {self.max_seconds:.0f}s). The task may be too complex — "
"try breaking it into smaller steps or providing more specific instructions."
)
except BudgetExceededError as e:
msg = str(e).replace("[RunGuard] ", "")
return (
f"I stopped early to protect your budget: {msg} "
"Try a more focused question or increase the per-turn limit in settings."
)
finally:
cl.user_session.set("current_turn_budget", None)
def record_tool_call(input_tokens: int = 0, output_tokens: int = 0):
"""
Call this inside each tool function to record usage and check the budget.
Raises BudgetExceededError if any limit is exceeded.
"""
budget: TurnBudget | None = cl.user_session.get("current_turn_budget")
if budget is None:
return # no active budget guard — allow
budget.tool_calls_made += 1
call_cost = (
input_tokens / 1_000_000 * ChainlitBudgetedAgent.INPUT_COST_PER_1M
+ output_tokens / 1_000_000 * ChainlitBudgetedAgent.OUTPUT_COST_PER_1M
)
budget.cost_usd_spent += call_cost
budget.check()
cl.user_session.set("current_turn_budget", budget)
# Integration example with a LangChain-style tool
async def search_tool(query: str, budget=None) -> str:
"""Example tool that records usage and checks the budget."""
# Estimate token usage before the call (or measure after)
estimated_input_tokens = 500
estimated_output_tokens = 200
record_tool_call(estimated_input_tokens, estimated_output_tokens)
# ... actual tool implementation ...
return f"Search results for: {query}"
# Chainlit entry point
_agent = ChainlitBudgetedAgent(
agent_callable=lambda user_input, budget, **kw: run_agent(user_input, budget=budget),
max_seconds=90.0,
max_cost_usd=0.25,
max_tool_calls=20,
)
@cl.on_message
async def on_message_budgeted(message: cl.Message):
async with cl.Step(name="Agent", type="run") as step:
answer = await _agent.run(message.content)
step.output = answer
await cl.Message(content=answer).send()
The asyncio.wait_for timeout is the most important line in this guard. Without it, a Chainlit handler can run indefinitely — the WebSocket keeps the connection open, the user sees a spinner, and the agent continues making API calls. The wall-clock timeout converts a potentially infinite loop into a bounded failure: at most max_seconds of LLM API calls regardless of what the agent does. The cost budget in TurnBudget provides a second gate that trips based on actual spend rather than time, which catches expensive short loops that complete their turns within the time limit but burn excessive tokens per call.
The record_tool_call() function is the integration point for existing tool code: add one call at the top of each tool function, passing the estimated or measured token usage. The budget check inside record_tool_call() raises BudgetExceededError synchronously within the tool, which propagates up through the agent executor's tool dispatch loop and reaches the except BudgetExceededError handler in ChainlitBudgetedAgent.run(). The user receives a clear budget message instead of a timeout error or silent failure.
Failure mode 4: session context drift and silent message replay
Chainlit sessions persist across WebSocket reconnects. When a user's browser tab loses its connection and reconnects — due to a network blip, a tab sleep, or a page refresh — Chainlit re-establishes the session and restores the message history from its in-memory store. This is the correct behavior for UX: the user's conversation survives a brief disconnect. The cost pathology appears when state in cl.user_session is not scoped correctly to individual message turns.
The drift pattern has two variants. In the first variant, a reconnect triggers a duplicate execution: the framework's reconnect handler re-calls @cl.on_message with the last user message, re-running the agent for that message because the session state does not record that it was already processed. The agent runs a second time for the same user input, billed a second time, and appends a second response to the conversation. The user sees duplicate answers; the developer sees doubled LLM costs with no corresponding increase in user messages.
In the second variant, stale session state from an interrupted turn contaminates the next turn. An agent that was mid-loop when the connection dropped left partial state in cl.user_session: accumulated tool outputs, an intermediate plan, a half-built response. When the user reconnects and sends a new message, the next turn's agent picks up this stale state and behaves erratically — making tool calls that reference a prior task, sending responses that don't match the new question, or entering a loop because it's trying to complete an abandoned task.
import hashlib
import time
import chainlit as cl
from dataclasses import dataclass, field
@dataclass
class ProcessedMessageRecord:
"""Tracks which message IDs have been processed in this session."""
processed_ids: set[str] = field(default_factory=set)
last_turn_completed_at: float = 0.0
in_flight_message_id: str | None = None
in_flight_started_at: float = 0.0
def is_duplicate(self, message_id: str) -> bool:
return message_id in self.processed_ids
def mark_in_flight(self, message_id: str) -> None:
self.in_flight_message_id = message_id
self.in_flight_started_at = time.monotonic()
def mark_completed(self, message_id: str) -> None:
self.processed_ids.add(message_id)
self.in_flight_message_id = None
self.last_turn_completed_at = time.monotonic()
def has_stale_in_flight(self, stale_threshold_seconds: float = 30.0) -> bool:
"""Returns True if there is an in-flight message that has been running too long."""
if self.in_flight_message_id is None:
return False
elapsed = time.monotonic() - self.in_flight_started_at
return elapsed > stale_threshold_seconds
def message_fingerprint(content: str, session_id: str) -> str:
"""
Creates a stable fingerprint for a message.
Uses content + session_id to distinguish identical messages in different sessions.
Does NOT include timestamps so that reconnects for the same message get the same fingerprint.
"""
raw = f"{session_id}:{content.strip()}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def get_message_record() -> ProcessedMessageRecord:
record = cl.user_session.get("message_record")
if record is None:
record = ProcessedMessageRecord()
cl.user_session.set("message_record", record)
return record
def clear_stale_in_flight_state():
"""
Clears any turn-level state that was left over from an interrupted handler.
Call this at the start of each on_message handler after detecting stale in-flight state.
"""
keys_to_clear = ["turn_tracker", "current_turn_budget", "agent_intermediate_steps"]
for key in keys_to_clear:
cl.user_session.set(key, None)
record = get_message_record()
record.in_flight_message_id = None
cl.user_session.set("message_record", record)
@cl.on_message
async def on_message_deduped(message: cl.Message):
session_id = cl.context.session.id
fingerprint = message_fingerprint(message.content, session_id)
record = get_message_record()
# Reject duplicate messages (reconnect replay)
if record.is_duplicate(fingerprint):
# Don't re-process — silently return; the history already has the response
print(
f"[RunGuard] Duplicate message detected in session {session_id[:8]}: "
f"fingerprint {fingerprint}. Skipping re-execution."
)
return
# Detect stale in-flight state from a prior interrupted turn
if record.has_stale_in_flight(stale_threshold_seconds=30.0):
stale_id = record.in_flight_message_id
print(
f"[RunGuard] Stale in-flight message {stale_id} detected in session {session_id[:8]}. "
"Clearing turn-level state before starting new turn."
)
clear_stale_in_flight_state()
# Optionally, notify the user that the previous request was interrupted
await cl.Message(
content=(
"My previous response was interrupted (likely a connection drop). "
"Starting fresh on your new message."
)
).send()
# Mark this message as in-flight
record.mark_in_flight(fingerprint)
cl.user_session.set("message_record", record)
try:
# ... run your agent handler here ...
response = await run_agent_handler(message.content)
await cl.Message(content=response).send()
# Mark completed only on success
record.mark_completed(fingerprint)
cl.user_session.set("message_record", record)
except Exception as e:
# On failure, do NOT mark as completed — allow retry
record.in_flight_message_id = None
cl.user_session.set("message_record", record)
raise
The message fingerprint combines the message content and session ID into a stable hash that is identical across reconnects for the same message — unlike Chainlit's message object ID, which is assigned at send time and may differ between the original send and a replayed send on reconnect. The fingerprint is stored in the processed_ids set in cl.user_session, which persists across WebSocket reconnects for the same session (Chainlit's session lifetime is longer than any individual WebSocket connection). The first handler for a message fingerprint proceeds; subsequent handlers for the same fingerprint (reconnects) return early without re-executing.
The stale in-flight detection handles the second variant: an agent that was interrupted mid-turn left the session in an inconsistent state. The guard detects this by checking whether the in_flight_message_id is set and older than 30 seconds — at which point the prior turn must have been interrupted rather than still running, since Chainlit's WebSocket timeout would have fired. It then clears all turn-level state keys before starting the new turn, preventing the new agent from inheriting the interrupted state.
Combining guards in production Chainlit apps
These four guards operate at different layers of the Chainlit request lifecycle and should be composed in order:
- The message deduplication guard runs first, at the very top of
@cl.on_message, before any agent logic. It is a fast check (hash lookup in a set) and prevents the remaining guards from running twice for replayed messages. - The history token guard runs second, replacing any manual history assembly before the LLM call. It is stateless across turns — it reads the current Chainlit message history and enforces the budget for this turn's context window. It does not need to know about the previous turn's execution.
- The step nesting guard is embedded in the agent loop, wrapping each
cl.Stepcontext. It resets at the start of each turn (viareset_turn_tracker()) and accumulates across the turn's nested steps. Use it when your agent framework generates step trees (LangChain, LangGraph, LlamaIndex). - The async budget guard wraps the entire agent coroutine with a wall-clock timeout and a cost ceiling. It is the outer safety net that catches failures the step guard cannot: a tight asyncio loop that never creates a
cl.Step, or a tool that runs an external subprocess for a long time without making LLM calls.
None of these guards require changes to your LLM backend or agent framework — they operate entirely within the Chainlit layer on the message, session, and step abstractions that Chainlit already provides. The result is defense in depth: the history guard prevents token accumulation from compounding across turns, the step guard catches looping sub-agents, the budget guard catches anything that takes too long or costs too much, and the deduplication guard prevents the whole stack from re-running on reconnect.
LangChain callback integration note: When using LangChain's AsyncLangchainCallbackHandler, history is injected by the callback rather than by your @cl.on_message handler. To apply the history token guard in this path, set a max_token_limit on the ConversationTokenBufferMemory or ConversationSummaryBufferMemory that backs the chain's memory — this enforces the budget at the LangChain layer. Combine it with Chainlit's step guard to catch loops within the chain. For LlamaIndex's LlamaIndexCallbackHandler, set similarity_top_k limits on retrievers and max_iterations on query engines to bound per-turn LLM calls at the LlamaIndex layer.
Summary: Chainlit cost amplification patterns
| Pattern | Cost impact | Guard |
|---|---|---|
Conversation history accumulationcl.chat_context.get() injected into every LLM call |
Input token cost per turn grows quadratically with conversation length; turn 30 may cost 30× more than turn 1 | Token-budget history truncation; keep tail of conversation within 16k-token window |
Nested step explosionlooping sub-agent creates recursive cl.Step children |
Dozens to hundreds of LLM calls from a single user message; each nested step billed separately | Step depth limit (max 6) + breadth limit (max 40 steps per turn) + LLM call count cap |
Async tool call budget overflowno default ceiling on agent tool calls per turn |
Agent runs indefinitely, streaming progress while burning API budget; no Chainlit-layer interrupt | asyncio.wait_for timeout (90s) + per-turn cost budget ($0.25) + tool call count cap (20) |
Session context drift and replayreconnect re-executes on_message for same user input |
Duplicate billing for the same user message; stale intermediate state contaminates next turn | Message fingerprint deduplication + stale in-flight detection + turn state cleanup on reconnect |
Frequently asked questions
Does Chainlit have built-in tool call limits or per-turn cost budgets?
Chainlit does not include per-turn cost budgets, token limits, or tool call ceilings. It provides the UI scaffolding and callback handlers that connect to your agent framework, but budget enforcement is the responsibility of the agent framework layer (e.g., LangChain's max_iterations on AgentExecutor) or your own application code. Chainlit's cl.Step tracks steps for display purposes but does not count or limit them. This means the budget guards described in this post must be implemented at the application level — Chainlit provides all the hooks needed (cl.user_session for state, @cl.on_message for interception, cl.Step for step wrapping), but the enforcement logic is yours to write.
How does Chainlit's message history work with LangChain's ConversationChain?
When you use AsyncLangchainCallbackHandler, Chainlit intercepts LangChain's chain execution and displays steps in the UI. However, the conversation memory is managed by LangChain's memory class (typically ConversationBufferMemory or ConversationTokenBufferMemory), not by Chainlit's cl.chat_context. The LangChain memory and Chainlit's message history are separate: Chainlit's history is what the UI displays; LangChain's memory is what the LLM receives. To apply token budget enforcement, set a max_token_limit on ConversationTokenBufferMemory (which LangChain enforces before each LLM call) and separately truncate cl.chat_context.get() if you're passing it to other parts of your application. If you're using ConversationBufferMemory without a token limit, it will grow unboundedly — this is the most common source of silent cost accumulation in Chainlit + LangChain apps.
Can RunGuard's LoopDetector integrate with Chainlit's callback system?
Yes. RunGuard's LoopDetector and BudgetTracker are pure Python classes with no framework dependencies, so they integrate at any layer of a Chainlit app. The most ergonomic integration is to instantiate a LoopDetector in cl.user_session at session start, then call detector.step(tool_call_signature) inside each tool function and inside the AsyncLangchainCallbackHandler.on_tool_start() callback. The BudgetTracker integrates similarly: call tracker.record(input_tokens, output_tokens) in on_llm_end() and check tracker.is_exceeded() in on_chain_start() to abort the chain before the next LLM call. For Chainlit apps that use LangGraph, integrate the guards as graph-level edges that check conditions between nodes rather than inside individual node functions — this gives cleaner state management and prevents partial execution when a budget is exceeded mid-graph.
How do I track per-user LLM costs across sessions in Chainlit?
Chainlit's cl.user_session is scoped to a single session (one browser connection) and does not persist across session restarts. For per-user cost tracking that survives across multiple sessions, you need an external store. The minimal approach: use Chainlit's @cl.on_chat_start hook to load a per-user cost record from a database (SQLite, Postgres, or Redis work well) keyed by the user identifier from cl.user_session.get("user"). At the end of each turn, persist the updated cost to the same store. Chainlit's @cl.oauth_callback or custom authentication layer provides the user identity. For teams using Chainlit Cloud (Chainlit's hosted platform), per-user cost tracking integrates with their built-in session storage — check the Chainlit documentation for the current cl.data_layer API.
How does Chainlit handle long-running agent tasks that take more than 60 seconds?
Chainlit's WebSocket connection stays open as long as the user's browser tab is connected and the server-side session is active. There is no built-in timeout on @cl.on_message handlers — a handler can run for minutes or hours without being forcibly terminated by Chainlit. This means the framework itself is not the right place to implement a task timeout; you must implement it in application code using asyncio.wait_for (as shown in the async budget guard above) or a task queue pattern where long-running work is submitted to a background worker and the Chainlit handler streams progress updates without blocking on completion. For tasks that exceed a few minutes, the task queue pattern is strongly preferred over blocking the on_message coroutine: it decouples the UI session from the task execution, survives WebSocket reconnects without re-executing the task, and lets you stream real-time progress updates by polling the queue for status changes.