LLM Agent Data Pipeline Cost Optimization: Filtering, Caching & Batching
Data pipelines that route records through LLM agents are some of the fastest-growing LLM cost centers in 2026. Unlike interactive agents that wait for user input, pipelines run unattended — processing thousands or millions of records without a human watching the bill. This guide covers five techniques that reliably cut pipeline LLM costs by 60–80% without sacrificing output quality.
Anatomy of a Costly Pipeline
A typical LLM-augmented data pipeline looks like this:
raw_records → [fetch] → [transform] → [LLM classify/enrich] → [store]
The cost problem is almost always in what gets sent to the LLM layer. Teams build the happy path first — every record goes through LLM enrichment — then discover the bill at the end of the month. Common culprits:
- No filtering: 40–70% of records may be classifiable by simple rules (regex, keyword match, null check) without LLM at all. Yet every record gets an LLM call.
- No caching: Pipelines reprocess unchanged records on every run. A daily pipeline re-classifying last week's data re-spends last week's LLM budget.
- Inefficient batching: 10,000 records sent as 10,000 individual calls incurs 10,000× the system prompt overhead compared to 100 batches of 100 records each.
- Over-specified prompts: System prompts containing 2,000 tokens of instructions for a task that needs 100 tokens.
- Wrong model: Using GPT-4o or Claude Sonnet for tasks that claude-haiku-4-5 or GPT-4o-mini handle equally well at 10× lower cost.
Upstream Filtering: Only Send Records That Need LLM
Before any record touches the LLM layer, apply a cascade of cheap filters. Structure it as a decision tree — the cheapest check first, LLM last:
from dataclasses import dataclass
from typing import Optional
import re
@dataclass
class Record:
id: str
text: str
metadata: dict
class PipelineFilter:
"""Pre-LLM filter cascade. Returns classification if deterministic,
None if record should be forwarded to LLM."""
# If text is below this length, almost always "empty" or "noise"
MIN_USEFUL_LENGTH = 20
# Patterns that signal no LLM needed
SPAM_PATTERNS = [
re.compile(r"^(unsubscribe|opt.?out|remove me)", re.I),
re.compile(r"^\s*$"), # blank
re.compile(r"^(\d{1,3}\.){3}\d{1,3}$"), # IP address only
]
# High-confidence positive signals (no LLM needed)
HIGH_CONF_POSITIVE = [
re.compile(r"\berror\b.*\bexception\b", re.I),
re.compile(r"\bcrash\b|\bcrashed\b", re.I),
]
def should_skip(self, record: Record) -> Optional[str]:
"""Return a classification label if we can skip LLM, else None."""
# Rule 1: too short to be meaningful
if len(record.text.strip()) < self.MIN_USEFUL_LENGTH:
return "noise"
# Rule 2: explicit spam patterns
for pattern in self.SPAM_PATTERNS:
if pattern.search(record.text):
return "spam"
# Rule 3: already classified by upstream metadata
if record.metadata.get("category"):
return record.metadata["category"] # trust upstream label
# Rule 4: high-confidence heuristics
for pattern in self.HIGH_CONF_POSITIVE:
if pattern.search(record.text):
return "bug_report"
# No deterministic answer — forward to LLM
return None
async def process_pipeline(records: list[Record]):
fltr = PipelineFilter()
llm_queue = []
results = {}
for record in records:
fast_label = fltr.should_skip(record)
if fast_label:
results[record.id] = fast_label # free classification
else:
llm_queue.append(record) # needs LLM
# Only the residual goes to LLM
llm_skip_rate = 1 - len(llm_queue) / len(records)
print(f"Filter pass: {llm_skip_rate:.0%} of records skip LLM")
llm_results = await batch_classify_with_llm(llm_queue)
results.update(llm_results)
return results
Real-world filter skip rates by pipeline type:
| Pipeline type | Typical skip rate | Cost saving |
|---|---|---|
| Support ticket classification | 30–50% | 30–50% |
| Log anomaly detection | 60–80% | 60–80% |
| Review sentiment analysis | 20–35% | 20–35% |
| News/content categorization | 40–60% | 40–60% |
| Code diff classification | 25–40% | 25–40% |
Result Caching: Don't Re-process Identical Inputs
Many pipelines run on rolling windows of data where most records haven't changed since the last run. Cache LLM results by a stable content hash and skip re-processing on subsequent runs.
import hashlib
import json
import redis
from typing import Optional
r = redis.Redis(host="localhost", decode_responses=True)
CACHE_TTL_SECONDS = 86400 * 7 # 7 days
def content_hash(record: Record) -> str:
"""Stable hash over the fields used as LLM input."""
content = {
"text": record.text,
"type": record.metadata.get("type", ""),
}
return hashlib.sha256(
json.dumps(content, sort_keys=True).encode()
).hexdigest()[:16]
def cache_get(record: Record) -> Optional[str]:
return r.get(f"llm:cache:{content_hash(record)}")
def cache_set(record: Record, label: str):
r.setex(
f"llm:cache:{content_hash(record)}",
CACHE_TTL_SECONDS,
label
)
async def classify_with_cache(records: list[Record]) -> dict[str, str]:
results = {}
cache_misses = []
for record in records:
cached = cache_get(record)
if cached:
results[record.id] = cached
else:
cache_misses.append(record)
cache_hit_rate = 1 - len(cache_misses) / max(len(records), 1)
print(f"Cache hit rate: {cache_hit_rate:.0%}")
if cache_misses:
fresh = await batch_classify_with_llm(cache_misses)
for record in cache_misses:
label = fresh[record.id]
cache_set(record, label)
results[record.id] = label
return results
Cache design decisions:
- Hash over input fields only: Don't include timestamps, IDs, or mutable metadata in the hash. Only hash the fields that determine the LLM output.
- TTL based on data volatility: For immutable records (historical events, archived tickets), set long TTLs (30 days+). For live data that can be edited, use shorter TTLs (24–48 hours).
- Invalidation strategy: For manual corrections, store a correction key that overrides the cache. Don't delete the cache entry — the old LLM result is still useful as a baseline.
- Cache warming: On your first production run, seed the cache from your existing labeled dataset. This gives you immediate hit rates of 30–50% on the first real run.
Smart Batching: Amortize Prompt Overhead
Every LLM API call carries fixed token overhead: the system prompt, role preamble, and response structure. With individual calls, this overhead is paid once per record. With batching, it's paid once per batch of N records.
For a system prompt of 500 tokens and 50-token records, batching 20 records per call reduces token consumption by ~33%:
- Individual: 20 calls × (500 system + 50 input + 30 output) = 11,600 tokens
- Batched: 1 call × (500 system + 20×50 input + 20×30 output) = 2,100 tokens
from anthropic import Anthropic
import json
client = Anthropic()
async def batch_classify_with_llm(
records: list[Record],
batch_size: int = 20
) -> dict[str, str]:
results = {}
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
# Format batch as numbered list for easy parsing
batch_text = "\n".join(
f"{j+1}. [{r.id}] {r.text[:500]}"
for j, r in enumerate(batch)
)
response = client.messages.create(
model="claude-haiku-4-5-20251001", # cheapest capable model
max_tokens=512,
system=(
"Classify each item as: bug_report, feature_request, "
"question, praise, or other. "
"Respond with a JSON object: {\"1\": \"label\", \"2\": \"label\", ...}. "
"Use the item numbers as keys. No explanation."
),
messages=[{"role": "user", "content": batch_text}]
)
try:
labels = json.loads(response.content[0].text)
for j, record in enumerate(batch):
results[record.id] = labels.get(str(j + 1), "other")
except (json.JSONDecodeError, KeyError):
# Fallback: classify individually if batch parse fails
for record in batch:
results[record.id] = await classify_single(record)
return results
Batch size tuning:
- 10–25 records per batch is the practical sweet spot for classification tasks. Larger batches risk context window overflow and degrade accuracy.
- Group by type before batching: Don't batch mixed record types. A batch of support tickets classifies better than a mixed batch of tickets + logs + reviews.
- Parallel batches: Run up to 5 batches concurrently to maintain throughput. Beyond 5–10, you typically hit rate limits before CPU becomes the bottleneck.
Token-Efficient Transformations
What you send to the model matters as much as how many calls you make. Three transformations consistently reduce token count without reducing output quality:
1. Field selection
def select_fields_for_llm(record: dict) -> str:
"""Only send fields the model needs for this task."""
# For classification: title + first 300 chars of body only
return json.dumps({
"title": record.get("title", "")[:200],
"body": record.get("body", "")[:300],
}, ensure_ascii=False)
# Don't send: timestamps, IDs, metadata, attachments, author info
2. Structured output forcing
# EXPENSIVE: "Classify this and explain your reasoning."
# Response: 150-300 tokens of explanation + label
# CHEAP: "Respond with exactly one word: bug, feature, question, praise, or other."
# Response: 1-3 tokens
system_prompt_cheap = (
"Respond with exactly one word from this list: "
"bug, feature, question, praise, other. "
"No punctuation, no explanation."
)
3. Prompt caching for stable system prompts
# With Anthropic's prompt caching: prefix the stable system prompt
# with cache_control to avoid re-encoding it on every call
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=16,
system=[{
"type": "text",
"text": YOUR_LONG_STABLE_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}
}],
messages=[{"role": "user", "content": batch_text}]
)
# Cached prefix: ~10% of normal input token cost on cache hit
Tiered Model Routing
Not every record needs your most capable model. Route based on record characteristics:
def select_model(record: Record) -> str:
text_len = len(record.text)
complexity_signals = record.metadata.get("complexity_signals", [])
# Simple short records → cheapest model
if text_len < 200 and not complexity_signals:
return "claude-haiku-4-5-20251001" # ~$0.00025/1K input tokens
# Medium complexity → mid-tier
if text_len < 1000 and len(complexity_signals) < 3:
return "claude-sonnet-4-6" # ~$0.003/1K input tokens
# Complex, long, or high-stakes → best model
return "claude-opus-4-7" # ~$0.015/1K input tokens
In practice, 60–75% of pipeline records qualify for the cheapest tier, 20–30% need the mid tier, and fewer than 10% need the top tier. Combined with filtering and caching, this 3-tier routing typically yields 70–80% cost reduction versus sending everything to the top model.
Cost Monitoring for Pipelines
Unlike interactive agents, data pipelines run without a human watching. Wire cost monitoring directly into your pipeline run loop so you get alerted before a costly run completes:
from runguard import RunGuard
import os
rg = RunGuard(api_key=os.environ["RUNGUARD_API_KEY"])
async def run_pipeline(records: list[Record]):
async with rg.wrap(
app_id="classification-pipeline",
env={
"RUNGUARD_BUDGET_USD": "50.0", # hard stop at $50/run
"RUNGUARD_ALERT_AT_USD": "10.0", # Slack alert at $10
"RUNGUARD_CORRELATION_ID": f"run-{int(time.time())}",
}
) as guard:
results = {}
for batch in chunked(records, 20):
guard.check() # raises BudgetExceeded if limit hit
batch_results = await classify_with_cache(batch)
results.update(batch_results)
return results
The RunGuard dashboard shows cost-per-run history, cache hit rates, and filter skip rates in one view — so you can see at a glance whether optimizations are working as the pipeline runs in production.
For further reading: LLM batch API cost reduction techniques, calculating LLM caching cost savings, and model routing cost optimization. To protect against runaway pipeline runs, see RunGuard's circuit breaker.