AI Agent Vector Database Cost Management: Embeddings, Queries & Storage

Vector databases are a hidden cost multiplier in AI agent systems. Beyond LLM API charges, RAG-based agents incur three separate cost streams: embedding generation, query execution, and vector storage. Each has its own optimization surface. This guide covers the techniques that cut combined vector DB costs by 50–70% in production RAG agents without reducing retrieval quality.

The Three Cost Streams

Most teams budget for LLM API calls and forget that a RAG agent generates three separate cost streams:

Cost streamDriverTypical share of total
Embedding generationNumber of documents ingested × tokens per doc10–25%
Vector query executionNumber of agent queries × index size5–15%
Vector storageNumber of vectors × dimensions × months20–40%
LLM inference (context)Retrieved chunks passed to model40–60%

Teams often focus solely on LLM inference cost and miss that embedding generation can exceed LLM costs for document-heavy agents, and that storage compounds monthly as the index grows.

Embedding Reuse and Deduplication

Embedding generation is charged per token — the same as LLM input tokens, just at lower rates. The key waste pattern: re-embedding documents that haven't changed on every pipeline run.

import hashlib
import json
import sqlite3
from typing import Optional
import numpy as np

class EmbeddingCache:
    """SQLite-backed embedding cache to avoid re-embedding unchanged documents."""

    def __init__(self, db_path: str = "embeddings.db", model: str = "text-embedding-3-small"):
        self.conn = sqlite3.connect(db_path)
        self.model = model
        self._init_schema()

    def _init_schema(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS embeddings (
                content_hash TEXT PRIMARY KEY,
                model         TEXT NOT NULL,
                vector        BLOB NOT NULL,
                created_at    INTEGER NOT NULL
            )
        """)
        self.conn.commit()

    def _hash(self, text: str) -> str:
        return hashlib.sha256(text.encode()).hexdigest()

    def get(self, text: str) -> Optional[list[float]]:
        row = self.conn.execute(
            "SELECT vector FROM embeddings WHERE content_hash=? AND model=?",
            (self._hash(text), self.model)
        ).fetchone()
        if row:
            return np.frombuffer(row[0], dtype=np.float32).tolist()
        return None

    def set(self, text: str, vector: list[float]):
        import time
        self.conn.execute(
            "INSERT OR REPLACE INTO embeddings VALUES (?,?,?,?)",
            (self._hash(text), self.model,
             np.array(vector, dtype=np.float32).tobytes(),
             int(time.time()))
        )
        self.conn.commit()

# Usage
from openai import OpenAI
client = OpenAI()
cache = EmbeddingCache()

def get_embedding(text: str) -> list[float]:
    cached = cache.get(text)
    if cached:
        return cached  # free

    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    vector = response.data[0].embedding
    cache.set(text, vector)
    return vector

def embed_documents(docs: list[str]) -> tuple[list[list[float]], dict]:
    vectors = []
    stats = {"cache_hits": 0, "api_calls": 0}
    for doc in docs:
        v = get_embedding(doc)
        if cache.get(doc) is not None:
            stats["cache_hits"] += 1
        else:
            stats["api_calls"] += 1
        vectors.append(v)
    return vectors, stats

Additional embedding cost controls:

Query Result Caching

Agents often issue the same retrieval queries repeatedly — especially in multi-agent systems where several agents independently query the same knowledge base for the same context. Cache retrieval results for the session duration:

import redis
import hashlib
import json
from typing import Any

r = redis.Redis(host="localhost", decode_responses=False)

class RetrievalCache:
    def __init__(self, ttl_seconds: int = 300):  # 5-min session cache
        self.ttl = ttl_seconds

    def _key(self, query: str, top_k: int, filter_dict: dict) -> str:
        payload = json.dumps(
            {"q": query, "k": top_k, "f": filter_dict},
            sort_keys=True
        )
        return f"retrieval:{hashlib.sha256(payload.encode()).hexdigest()[:16]}"

    def get(self, query: str, top_k: int, filter_dict: dict) -> Any:
        raw = r.get(self._key(query, top_k, filter_dict))
        if raw:
            return json.loads(raw)
        return None

    def set(self, query: str, top_k: int, filter_dict: dict, results: Any):
        r.setex(
            self._key(query, top_k, filter_dict),
            self.ttl,
            json.dumps(results)
        )

retrieval_cache = RetrievalCache(ttl_seconds=300)

async def retrieve(
    query: str,
    top_k: int = 5,
    filter_dict: dict = None
) -> list[dict]:
    filter_dict = filter_dict or {}

    cached = retrieval_cache.get(query, top_k, filter_dict)
    if cached:
        return cached  # free — no vector DB query charge

    # Execute actual vector query
    results = await vector_db.query(
        query_vector=get_embedding(query),
        top_k=top_k,
        filter=filter_dict
    )

    retrieval_cache.set(query, top_k, filter_dict, results)
    return results

In multi-agent workflows, query cache hit rates of 30–60% are common within a single session. For a 10-agent parallel pipeline that each independently retrieve context, a 5-minute session cache can cut retrieval costs by up to 90% on the 9 cache hits.

Retrieval Budget Enforcement

Unbounded top_k values are a common source of both retrieval costs and context bloat (which amplifies LLM costs). Enforce per-query and per-session retrieval budgets:

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RetrievalBudget:
    max_queries_per_session: int = 20
    max_chunks_per_query: int = 5
    max_total_chunks: int = 50
    max_chunk_tokens: int = 500  # truncate chunks above this

    _query_count: int = field(default=0, init=False, repr=False)
    _total_chunks: int = field(default=0, init=False, repr=False)

    def check_query(self) -> bool:
        if self._query_count >= self.max_queries_per_session:
            raise RetrievalBudgetExceeded(
                f"Query limit: {self._query_count}/{self.max_queries_per_session}"
            )
        self._query_count += 1
        return True

    def enforce_top_k(self, requested_k: int) -> int:
        remaining = self.max_total_chunks - self._total_chunks
        return min(requested_k, self.max_chunks_per_query, remaining)

    def record_chunks(self, count: int):
        self._total_chunks += count
        if self._total_chunks >= self.max_total_chunks:
            raise RetrievalBudgetExceeded(
                f"Chunk limit: {self._total_chunks}/{self.max_total_chunks}"
            )

class RetrievalBudgetExceeded(Exception):
    pass

async def guarded_retrieve(
    query: str,
    budget: RetrievalBudget,
    requested_k: int = 10
) -> list[dict]:
    budget.check_query()
    safe_k = budget.enforce_top_k(requested_k)

    results = await retrieve(query, top_k=safe_k)

    # Truncate individual chunks to max token length
    truncated = []
    for r in results:
        text = r["text"]
        if len(text) > budget.max_chunk_tokens * 4:  # rough char estimate
            text = text[:budget.max_chunk_tokens * 4] + "..."
        truncated.append({**r, "text": text})

    budget.record_chunks(len(truncated))
    return truncated

Index Tiering: Hot vs. Cold Storage

Vector storage costs scale linearly with index size. For most agents, a small subset of documents (recently added, frequently retrieved, high-relevance) accounts for the majority of queries. Tier your index by access frequency:

TierContentStorage typeCost profile
HotLast 30 days + high-frequency docsManaged vector DB (Pinecone, Qdrant Cloud)High/query, low/storage
Warm31–180 days, medium-frequencySelf-hosted Qdrant on VPSLow/query, medium/storage
Cold180+ days, archivalFlat files (parquet + FAISS index) on object storageMinimal/query, very low/storage

Implementation pattern: run a weekly job that moves vectors from hot → warm → cold based on last-accessed timestamp. On query, search hot tier first; if results are below confidence threshold, fan out to warm. Only search cold on explicit archival retrieval requests.

For teams using Pinecone's serverless tier: the serverless model charges per read unit (RU) — each RU covers a fixed number of vector dimensions × top_k results. Reducing top_k from 20 to 5 cuts RU consumption by 75%. This single change is often worth $100–$500/month on high-volume agents.

Chunk Size Optimization

The chunk size you use at ingestion time determines both embedding costs and retrieval quality. Smaller chunks = cheaper embeddings, worse recall. Larger chunks = expensive embeddings, better recall but more LLM context tokens. The optimization target is the chunk size that maximizes retrieval quality per dollar.

Benchmark your retrieval quality at multiple chunk sizes before committing to production:

from typing import NamedTuple

class ChunkBenchmark(NamedTuple):
    chunk_size: int
    recall_at_5: float       # how often the right doc is in top-5 results
    avg_tokens_per_chunk: int
    cost_per_1k_docs_usd: float

# Typical results for support ticket knowledge base (your mileage varies)
benchmarks = [
    ChunkBenchmark(128,  0.71, 128,  0.0026),
    ChunkBenchmark(256,  0.82, 256,  0.0051),  # usually best cost/quality
    ChunkBenchmark(512,  0.87, 512,  0.0102),
    ChunkBenchmark(1024, 0.89, 1024, 0.0205),
]

# Calculate cost-adjusted quality score
for b in benchmarks:
    score = b.recall_at_5 / b.cost_per_1k_docs_usd
    print(f"chunk={b.chunk_size}: recall={b.recall_at_5:.2f}, "
          f"cost={b.cost_per_1k_docs_usd:.4f}, "
          f"quality/dollar={score:.0f}")

Overlap between chunks (typically 10–20% of chunk size) improves recall without changing the total token count significantly. Test 0%, 10%, and 20% overlap as part of your chunk size benchmark.

Wiring RunGuard for Vector Cost Visibility

RunGuard tracks the full cost surface of an agent run — LLM calls, embedding calls, and retrieval operations — in a single budget envelope. This gives you a true per-session cost that includes vector DB charges alongside LLM charges:

import os
from runguard import RunGuard

rg = RunGuard(api_key=os.environ["RUNGUARD_API_KEY"])

async def rag_agent_session(user_query: str):
    async with rg.wrap(
        app_id="rag-support-agent",
        env={
            "RUNGUARD_BUDGET_USD": "0.10",         # $0.10 per session
            "RUNGUARD_TRACK_EMBEDDINGS": "true",   # include embedding costs
            "RUNGUARD_TRACK_RETRIEVAL": "true",    # include vector query costs
        }
    ) as guard:
        # All costs (LLM + embeddings + vector queries) flow through
        # the same budget envelope and trip the circuit breaker together.
        context = await guarded_retrieve(user_query, budget=RetrievalBudget())
        response = await generate_response(user_query, context, guard=guard)
        return response

Combining embedding reuse, query caching, retrieval budget enforcement, index tiering, and right-sized chunks typically achieves 50–70% total cost reduction compared to an unoptimized RAG agent. The gains compound: fewer queries hit the vector DB (caching), each query retrieves fewer chunks (budget), each chunk costs less to embed (reuse + chunk size), and each chunk contributes fewer tokens to LLM context (truncation).

Related: AI agent context pruning strategies, LLM caching cost savings calculation, and multi-agent orchestration cost control.