LLM Agent Data Pipeline Cost Optimization: Filtering, Caching & Batching

Data pipelines that route records through LLM agents are some of the fastest-growing LLM cost centers in 2026. Unlike interactive agents that wait for user input, pipelines run unattended — processing thousands or millions of records without a human watching the bill. This guide covers five techniques that reliably cut pipeline LLM costs by 60–80% without sacrificing output quality.

Anatomy of a Costly Pipeline

A typical LLM-augmented data pipeline looks like this:

raw_records → [fetch] → [transform] → [LLM classify/enrich] → [store]

The cost problem is almost always in what gets sent to the LLM layer. Teams build the happy path first — every record goes through LLM enrichment — then discover the bill at the end of the month. Common culprits:

Upstream Filtering: Only Send Records That Need LLM

Before any record touches the LLM layer, apply a cascade of cheap filters. Structure it as a decision tree — the cheapest check first, LLM last:

from dataclasses import dataclass
from typing import Optional
import re

@dataclass
class Record:
    id: str
    text: str
    metadata: dict

class PipelineFilter:
    """Pre-LLM filter cascade. Returns classification if deterministic,
    None if record should be forwarded to LLM."""

    # If text is below this length, almost always "empty" or "noise"
    MIN_USEFUL_LENGTH = 20

    # Patterns that signal no LLM needed
    SPAM_PATTERNS = [
        re.compile(r"^(unsubscribe|opt.?out|remove me)", re.I),
        re.compile(r"^\s*$"),          # blank
        re.compile(r"^(\d{1,3}\.){3}\d{1,3}$"),  # IP address only
    ]

    # High-confidence positive signals (no LLM needed)
    HIGH_CONF_POSITIVE = [
        re.compile(r"\berror\b.*\bexception\b", re.I),
        re.compile(r"\bcrash\b|\bcrashed\b", re.I),
    ]

    def should_skip(self, record: Record) -> Optional[str]:
        """Return a classification label if we can skip LLM, else None."""

        # Rule 1: too short to be meaningful
        if len(record.text.strip()) < self.MIN_USEFUL_LENGTH:
            return "noise"

        # Rule 2: explicit spam patterns
        for pattern in self.SPAM_PATTERNS:
            if pattern.search(record.text):
                return "spam"

        # Rule 3: already classified by upstream metadata
        if record.metadata.get("category"):
            return record.metadata["category"]  # trust upstream label

        # Rule 4: high-confidence heuristics
        for pattern in self.HIGH_CONF_POSITIVE:
            if pattern.search(record.text):
                return "bug_report"

        # No deterministic answer — forward to LLM
        return None

async def process_pipeline(records: list[Record]):
    fltr = PipelineFilter()
    llm_queue = []
    results = {}

    for record in records:
        fast_label = fltr.should_skip(record)
        if fast_label:
            results[record.id] = fast_label  # free classification
        else:
            llm_queue.append(record)  # needs LLM

    # Only the residual goes to LLM
    llm_skip_rate = 1 - len(llm_queue) / len(records)
    print(f"Filter pass: {llm_skip_rate:.0%} of records skip LLM")

    llm_results = await batch_classify_with_llm(llm_queue)
    results.update(llm_results)
    return results

Real-world filter skip rates by pipeline type:

Pipeline typeTypical skip rateCost saving
Support ticket classification30–50%30–50%
Log anomaly detection60–80%60–80%
Review sentiment analysis20–35%20–35%
News/content categorization40–60%40–60%
Code diff classification25–40%25–40%

Result Caching: Don't Re-process Identical Inputs

Many pipelines run on rolling windows of data where most records haven't changed since the last run. Cache LLM results by a stable content hash and skip re-processing on subsequent runs.

import hashlib
import json
import redis
from typing import Optional

r = redis.Redis(host="localhost", decode_responses=True)

CACHE_TTL_SECONDS = 86400 * 7  # 7 days

def content_hash(record: Record) -> str:
    """Stable hash over the fields used as LLM input."""
    content = {
        "text": record.text,
        "type": record.metadata.get("type", ""),
    }
    return hashlib.sha256(
        json.dumps(content, sort_keys=True).encode()
    ).hexdigest()[:16]

def cache_get(record: Record) -> Optional[str]:
    return r.get(f"llm:cache:{content_hash(record)}")

def cache_set(record: Record, label: str):
    r.setex(
        f"llm:cache:{content_hash(record)}",
        CACHE_TTL_SECONDS,
        label
    )

async def classify_with_cache(records: list[Record]) -> dict[str, str]:
    results = {}
    cache_misses = []

    for record in records:
        cached = cache_get(record)
        if cached:
            results[record.id] = cached
        else:
            cache_misses.append(record)

    cache_hit_rate = 1 - len(cache_misses) / max(len(records), 1)
    print(f"Cache hit rate: {cache_hit_rate:.0%}")

    if cache_misses:
        fresh = await batch_classify_with_llm(cache_misses)
        for record in cache_misses:
            label = fresh[record.id]
            cache_set(record, label)
            results[record.id] = label

    return results

Cache design decisions:

Smart Batching: Amortize Prompt Overhead

Every LLM API call carries fixed token overhead: the system prompt, role preamble, and response structure. With individual calls, this overhead is paid once per record. With batching, it's paid once per batch of N records.

For a system prompt of 500 tokens and 50-token records, batching 20 records per call reduces token consumption by ~33%:

from anthropic import Anthropic
import json

client = Anthropic()

async def batch_classify_with_llm(
    records: list[Record],
    batch_size: int = 20
) -> dict[str, str]:
    results = {}

    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]

        # Format batch as numbered list for easy parsing
        batch_text = "\n".join(
            f"{j+1}. [{r.id}] {r.text[:500]}"
            for j, r in enumerate(batch)
        )

        response = client.messages.create(
            model="claude-haiku-4-5-20251001",  # cheapest capable model
            max_tokens=512,
            system=(
                "Classify each item as: bug_report, feature_request, "
                "question, praise, or other. "
                "Respond with a JSON object: {\"1\": \"label\", \"2\": \"label\", ...}. "
                "Use the item numbers as keys. No explanation."
            ),
            messages=[{"role": "user", "content": batch_text}]
        )

        try:
            labels = json.loads(response.content[0].text)
            for j, record in enumerate(batch):
                results[record.id] = labels.get(str(j + 1), "other")
        except (json.JSONDecodeError, KeyError):
            # Fallback: classify individually if batch parse fails
            for record in batch:
                results[record.id] = await classify_single(record)

    return results

Batch size tuning:

Token-Efficient Transformations

What you send to the model matters as much as how many calls you make. Three transformations consistently reduce token count without reducing output quality:

1. Field selection

def select_fields_for_llm(record: dict) -> str:
    """Only send fields the model needs for this task."""
    # For classification: title + first 300 chars of body only
    return json.dumps({
        "title": record.get("title", "")[:200],
        "body": record.get("body", "")[:300],
    }, ensure_ascii=False)
    # Don't send: timestamps, IDs, metadata, attachments, author info

2. Structured output forcing

# EXPENSIVE: "Classify this and explain your reasoning."
# Response: 150-300 tokens of explanation + label

# CHEAP: "Respond with exactly one word: bug, feature, question, praise, or other."
# Response: 1-3 tokens

system_prompt_cheap = (
    "Respond with exactly one word from this list: "
    "bug, feature, question, praise, other. "
    "No punctuation, no explanation."
)

3. Prompt caching for stable system prompts

# With Anthropic's prompt caching: prefix the stable system prompt
# with cache_control to avoid re-encoding it on every call
response = client.messages.create(
    model="claude-haiku-4-5-20251001",
    max_tokens=16,
    system=[{
        "type": "text",
        "text": YOUR_LONG_STABLE_SYSTEM_PROMPT,
        "cache_control": {"type": "ephemeral"}
    }],
    messages=[{"role": "user", "content": batch_text}]
)
# Cached prefix: ~10% of normal input token cost on cache hit

Tiered Model Routing

Not every record needs your most capable model. Route based on record characteristics:

def select_model(record: Record) -> str:
    text_len = len(record.text)
    complexity_signals = record.metadata.get("complexity_signals", [])

    # Simple short records → cheapest model
    if text_len < 200 and not complexity_signals:
        return "claude-haiku-4-5-20251001"  # ~$0.00025/1K input tokens

    # Medium complexity → mid-tier
    if text_len < 1000 and len(complexity_signals) < 3:
        return "claude-sonnet-4-6"  # ~$0.003/1K input tokens

    # Complex, long, or high-stakes → best model
    return "claude-opus-4-7"  # ~$0.015/1K input tokens

In practice, 60–75% of pipeline records qualify for the cheapest tier, 20–30% need the mid tier, and fewer than 10% need the top tier. Combined with filtering and caching, this 3-tier routing typically yields 70–80% cost reduction versus sending everything to the top model.

Cost Monitoring for Pipelines

Unlike interactive agents, data pipelines run without a human watching. Wire cost monitoring directly into your pipeline run loop so you get alerted before a costly run completes:

from runguard import RunGuard
import os

rg = RunGuard(api_key=os.environ["RUNGUARD_API_KEY"])

async def run_pipeline(records: list[Record]):
    async with rg.wrap(
        app_id="classification-pipeline",
        env={
            "RUNGUARD_BUDGET_USD": "50.0",         # hard stop at $50/run
            "RUNGUARD_ALERT_AT_USD": "10.0",        # Slack alert at $10
            "RUNGUARD_CORRELATION_ID": f"run-{int(time.time())}",
        }
    ) as guard:
        results = {}
        for batch in chunked(records, 20):
            guard.check()  # raises BudgetExceeded if limit hit
            batch_results = await classify_with_cache(batch)
            results.update(batch_results)

        return results

The RunGuard dashboard shows cost-per-run history, cache hit rates, and filter skip rates in one view — so you can see at a glance whether optimizations are working as the pipeline runs in production.

For further reading: LLM batch API cost reduction techniques, calculating LLM caching cost savings, and model routing cost optimization. To protect against runaway pipeline runs, see RunGuard's circuit breaker.