Groq Cloud Agent Cost Control: Rate Limit Retry Cascades, Speed-Amplified Loop Blindness, Daily Budget Depletion, and Context Accumulation at Scale

Groq's LPU (Language Processing Unit) infrastructure delivers 200–500 tokens per second for production-grade open-weight models — Llama 3.1 70B, Mixtral 8x7B, Gemma 2 9B — at throughput rates 5–8× faster than equivalent GPU-based API providers. The GroqCloud API is a drop-in OpenAI-compatible endpoint: set GROQ_API_KEY, point base_url at https://api.groq.com/openai/v1, and existing LangChain, LlamaIndex, CrewAI, or direct openai-SDK agent code runs immediately.

The speed advantage that makes Groq attractive for latency-sensitive agents is the same property that makes standard cost control assumptions fail. An agent loop that costs $5 on GPT-4 over 30 minutes costs the same $5 on Groq in under 5 minutes. The monitoring dashboard that fires an alert after detecting an anomaly takes longer to fire than the damage takes to accumulate. Retry logic designed around GPU provider rate limits behaves incorrectly against Groq's lower, independently structured rate limits. And daily token budgets — a second rate-limit dimension that does not exist at all on some other providers — can be fully depleted by mid-afternoon by an agent that carefully respected per-minute limits all day.

Four failure modes specific to Groq Cloud agentic pipelines:

  • Rate limit retry cascade — Groq's strict RPM and TPM limits are significantly lower than comparable providers; each retry after a 429 sends the full accumulated context, consuming more tokens per attempt than the last.
  • Speed-amplified loop blindness — At 400 tokens/second, a 30-iteration runaway loop completes 5–8× faster than on GPU providers; standard human monitoring and alerting cadences miss the entire event window.
  • Daily token budget (TPD) depletion independent of per-minute limits — Groq enforces a daily tokens-per-day cap separate from the per-minute TPM bucket; an agent that successfully navigates TPM limits can exhaust the daily budget hours before the billing day resets.
  • Context accumulation outpacing budget counters at LPU speed — Fast inference means agents complete more tool call iterations per minute than developers measured during testing on slower providers; token accumulation curves tested at 60 tokens/second underestimate by 5–8× when deployed against Groq's 400 tokens/second LPU.

Failure Mode 1 — Rate Limit Retry Cascade

Groq's GroqCloud free and paid tiers enforce two independent per-minute rate limits: requests per minute (RPM) and tokens per minute (TPM). The TPM limit for Llama 3.1 70B on the free tier is 6,000 tokens/minute. On the paid Developer tier it is 30,000 tokens/minute. These limits are per-model, and different models have different ceilings — Mixtral 8x7B has a higher TPM ceiling than Llama 3.1 70B on equivalent plans.

When an agent's request exceeds the TPM limit, Groq returns HTTP 429 with a Retry-After header indicating when the rate limit window resets. The default behavior in every major agent framework — LangChain, LlamaIndex, AutoGen — is to catch the 429, wait the specified duration, and retry the same request. The problem is that the same request includes the full accumulated context: system prompt, all prior conversation turns, all tool results received so far. If the agent accumulated 4,000 tokens of context before hitting the rate limit, the retry sends 4,000 tokens. If the agent was in an active tool loop and added two more tool results while waiting for the backoff delay to expire, the retry sends 4,600 tokens. Each retry attempt consumes a larger slice of the TPM budget than the previous attempt.

The cascade structure is: hit TPM limit → wait → retry with larger context → hit TPM limit again → wait longer (exponential backoff) → retry with even larger context. A ten-step retry sequence with 200-token growth per step and 60-second exponential backoff delays accumulates 2,000 extra tokens in context and takes 17+ minutes to complete — while the agent's per-step work output remains zero. The final successful retry consumes significantly more than the original request would have.

Retry token amplification rule: Each 429 retry that waits for context to grow by N tokens before retrying increases the effective cost of that request by N tokens. In a ten-retry spiral with 200 tokens of context growth per wait cycle, the final request costs 2,000 tokens more than the original. Track context size across retries, not just retry count.

Python
import time
import threading
from dataclasses import dataclass, field
from typing import Optional, Callable, Any

@dataclass
class GroqRateLimitGuard:
    """Tracks token consumption and enforces pre-flight limits before requests."""
    tpm_limit: int              # tokens per minute for your model + tier
    rpm_limit: int              # requests per minute
    max_context_tokens: int     # refuse to send context larger than this
    max_retries: int = 3
    _tokens_this_minute: int = field(default=0, init=False, repr=False)
    _requests_this_minute: int = field(default=0, init=False, repr=False)
    _window_start: float = field(default_factory=time.monotonic, init=False, repr=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
    retry_log: list = field(default_factory=list, init=False)

    def _reset_if_new_window(self):
        now = time.monotonic()
        if now - self._window_start >= 60.0:
            self._tokens_this_minute = 0
            self._requests_this_minute = 0
            self._window_start = now

    def check_preflight(self, estimated_tokens: int) -> tuple[bool, str]:
        """Call before every API request. Returns (ok, reason)."""
        with self._lock:
            self._reset_if_new_window()

            if estimated_tokens > self.max_context_tokens:
                return False, (
                    f"context estimate {estimated_tokens} tokens exceeds hard limit "
                    f"{self.max_context_tokens} — refusing to send"
                )

            projected_tpm = self._tokens_this_minute + estimated_tokens
            if projected_tpm > self.tpm_limit:
                seconds_until_reset = 60.0 - (time.monotonic() - self._window_start)
                return False, (
                    f"projected {projected_tpm} tokens would exceed TPM limit {self.tpm_limit} "
                    f"— wait {seconds_until_reset:.0f}s for window reset"
                )

            if self._requests_this_minute >= self.rpm_limit:
                seconds_until_reset = 60.0 - (time.monotonic() - self._window_start)
                return False, (
                    f"RPM limit {self.rpm_limit} reached "
                    f"— wait {seconds_until_reset:.0f}s for window reset"
                )

            return True, "ok"

    def record_request(self, tokens_sent: int):
        with self._lock:
            self._reset_if_new_window()
            self._tokens_this_minute += tokens_sent
            self._requests_this_minute += 1

    def guarded_call(
        self,
        api_fn: Callable[[], Any],
        estimated_tokens: int,
        context_token_count: Callable[[], int],
    ) -> Any:
        """Wrap a Groq API call with pre-flight check and retry protection."""
        for attempt in range(self.max_retries + 1):
            ok, reason = self.check_preflight(estimated_tokens)
            if not ok:
                if attempt == self.max_retries:
                    raise RuntimeError(f"[GroqRateLimitGuard] giving up after {attempt} retries: {reason}")
                wait_s = min(60.0 * (2 ** attempt), 300.0)
                self.retry_log.append({
                    "attempt": attempt,
                    "reason": reason,
                    "wait_s": wait_s,
                    "context_tokens_at_retry": context_token_count(),
                })
                print(f"[GroqRateLimitGuard] attempt {attempt}: {reason} — waiting {wait_s:.0f}s")
                time.sleep(wait_s)
                # Re-estimate tokens after waiting — context may have grown
                estimated_tokens = context_token_count()
                continue

            self.record_request(estimated_tokens)
            return api_fn()

        raise RuntimeError(f"[GroqRateLimitGuard] exhausted {self.max_retries} retries")

    def retry_token_amplification(self) -> int:
        """Returns extra tokens consumed due to context growth across retries."""
        if len(self.retry_log) < 2:
            return 0
        first = self.retry_log[0].get("context_tokens_at_retry", 0)
        last = self.retry_log[-1].get("context_tokens_at_retry", 0)
        return last - first


# Groq tier limits (as of 2026 Q2 — verify at console.groq.com/settings/limits)
GROQ_TIER_LIMITS = {
    "free": {
        "llama-3.1-70b-versatile": {"tpm": 6_000, "rpm": 30, "tpd": 500_000},
        "llama-3.1-8b-instant":    {"tpm": 20_000, "rpm": 30, "tpd": 500_000},
        "mixtral-8x7b-32768":      {"tpm": 5_000,  "rpm": 30, "tpd": 500_000},
        "gemma2-9b-it":            {"tpm": 15_000, "rpm": 30, "tpd": 500_000},
    },
    "developer": {
        "llama-3.1-70b-versatile": {"tpm": 30_000, "rpm": 100, "tpd": 1_000_000},
        "llama-3.1-8b-instant":    {"tpm": 30_000, "rpm": 100, "tpd": 1_000_000},
        "mixtral-8x7b-32768":      {"tpm": 18_000, "rpm": 100, "tpd": 1_000_000},
        "gemma2-9b-it":            {"tpm": 15_000, "rpm": 100, "tpd": 1_000_000},
    },
}

Failure Mode 2 — Speed-Amplified Loop Blindness

The economic damage from a runaway AI agent loop scales with two variables: tokens consumed per iteration and time allowed to run before detection. On a GPU-based provider delivering 60 tokens/second, a 30-step loop that produces 500 tokens of tool call output per step and accumulates 15,000 tokens of running context takes approximately 8 minutes to complete — by which time monitoring dashboards, spend alerts, or on-call engineers may have caught the anomaly.

Groq's LPU inference completes the same loop in under 90 seconds. At 400 tokens/second, each step that produced 500 tokens of output finishes in 1.25 seconds. The 30 iterations complete before a Slack spend-threshold alert configured on a 5-minute polling interval fires for the first time. A developer who tested their loop detection logic on OpenAI and verified that their runaway guard had "plenty of time to fire" discovers that the same guard fires too late on Groq — the loop completes entirely within the first polling interval.

The correct mitigation is to move loop detection from time-based polling to step-count and token-count checks that fire synchronously before each LLM call. A guard that checks "have I exceeded 20 tool calls?" after each tool result fires regardless of inference speed. A guard that checks "have I accumulated more than 8,000 input tokens?" fires at the same token threshold whether inference takes 1 second or 10 seconds per step.

Provider Throughput Time for 30-step loop 5-minute alert catches it?
GPT-4o ~50 tokens/s ~10 min Yes (fires at 5 min, loop still running)
Claude Sonnet 4.6 ~80 tokens/s ~6 min Marginal (fires once, loop near completion)
Groq Llama 3.1 70B ~400 tokens/s ~75 sec No (loop completes before first poll)
Groq Llama 3.1 8B ~750 tokens/s ~40 sec No (loop completes in first poll window)
Python
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class GroqSpeedLoopGuard:
    """
    Step-synchronous loop guard designed for LPU-speed inference.
    All checks fire before the next LLM call, not on a polling interval.
    """
    max_steps: int = 20
    max_input_tokens: int = 8_000
    max_output_tokens_per_step: int = 2_000
    max_total_output_tokens: int = 20_000
    max_wall_seconds: float = 120.0   # hard time ceiling as backup only

    _step: int = field(default=0, init=False, repr=False)
    _total_input_tokens: int = field(default=0, init=False, repr=False)
    _total_output_tokens: int = field(default=0, init=False, repr=False)
    _start_time: float = field(default_factory=time.monotonic, init=False, repr=False)
    _step_log: list = field(default_factory=list, init=False)

    def check_before_call(self, input_tokens: int) -> Optional[str]:
        """
        Call this immediately before every Groq API request.
        Returns None if safe to proceed, or a string reason to stop.
        """
        elapsed = time.monotonic() - self._start_time

        if self._step >= self.max_steps:
            return f"step ceiling reached: {self._step} >= max_steps={self.max_steps}"
        if input_tokens > self.max_input_tokens:
            return (
                f"input context {input_tokens} tokens exceeds limit {self.max_input_tokens} "
                f"— context window runaway detected"
            )
        if self._total_output_tokens >= self.max_total_output_tokens:
            return (
                f"total output {self._total_output_tokens} tokens exceeds budget "
                f"{self.max_total_output_tokens}"
            )
        if elapsed >= self.max_wall_seconds:
            return f"wall-clock timeout: {elapsed:.0f}s >= {self.max_wall_seconds}s"

        return None

    def record_step(self, input_tokens: int, output_tokens: int):
        """Call after each successful LLM response."""
        if output_tokens > self.max_output_tokens_per_step:
            print(
                f"[GroqSpeedLoopGuard] step {self._step}: output {output_tokens} tokens "
                f"exceeds per-step limit {self.max_output_tokens_per_step} — possible output explosion"
            )
        self._step_log.append({
            "step": self._step,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "elapsed_s": time.monotonic() - self._start_time,
        })
        self._step += 1
        self._total_input_tokens = input_tokens  # current context size
        self._total_output_tokens += output_tokens

    def tokens_per_second(self) -> float:
        elapsed = time.monotonic() - self._start_time
        if elapsed < 0.1:
            return 0.0
        return self._total_output_tokens / elapsed

    def report(self) -> dict:
        return {
            "steps_completed": self._step,
            "current_input_tokens": self._total_input_tokens,
            "total_output_tokens": self._total_output_tokens,
            "wall_seconds": time.monotonic() - self._start_time,
            "tokens_per_second": self.tokens_per_second(),
            "steps": self._step_log,
        }


def groq_agent_loop(messages: list, tools: list, client, model: str, guard: GroqSpeedLoopGuard):
    """Example agent loop with step-synchronous guards for Groq LPU inference."""
    while True:
        # Estimate input tokens before the call
        input_tokens = sum(len(m.get("content", "") or "") // 4 for m in messages)

        # Synchronous check — fires BEFORE the API call regardless of speed
        stop_reason = guard.check_before_call(input_tokens)
        if stop_reason:
            print(f"[GroqSpeedLoopGuard] STOP: {stop_reason}")
            break

        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=tools,
        )

        choice = response.choices[0]
        output_tokens = response.usage.completion_tokens
        guard.record_step(response.usage.prompt_tokens, output_tokens)

        if choice.finish_reason == "stop" or not choice.message.tool_calls:
            # Agent finished naturally
            break

        # Process tool calls and append results to messages
        messages.append(choice.message.model_dump())
        for tc in choice.message.tool_calls:
            tool_result = execute_tool(tc)  # your tool execution logic
            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": str(tool_result),
            })

    return guard.report()

Failure Mode 3 — Daily Token Budget (TPD) Depletion Independent of Per-Minute Limits

Groq enforces three rate limit dimensions simultaneously: requests per minute (RPM), tokens per minute (TPM), and tokens per day (TPD). Most developers are aware of RPM and TPM — they are the limits that produce immediate 429 errors during active use. TPD operates differently: it is a rolling 24-hour window that accumulates across all API calls made with the same API key, resets at midnight UTC, and does not produce any warning until it is fully exhausted.

The failure pattern is: an agent team sets TPM-aware retry logic, tests that it correctly backs off during high-frequency bursts, and deploys. The agent runs well during morning hours, respecting per-minute limits. By 2–3 PM UTC, the daily token budget is exhausted. Every API call from that point returns HTTP 429 with a retry-after header pointing to the next midnight UTC reset — hours away. The agent cannot complete any work for the rest of the billing day. If the agent is part of an automated pipeline, the pipeline stalls silently until the daily reset.

Unlike TPM exhaustion, which resolves within 60 seconds, TPD exhaustion requires waiting for the 24-hour window to reset. An agent that exhausts its daily budget at 14:00 UTC cannot resume until 00:00 UTC the following day — a 10-hour outage from a single afternoon of heavy usage.

TPD budget projection: At a sustained Groq free-tier TPM of 6,000 tokens/minute (max throughput), the daily budget of 500,000 tokens exhausts in 83 minutes of continuous inference — less than 2 hours. On the Developer tier, 1,000,000 TPD at 30,000 TPM exhausts in 33 minutes of continuous use. Agents that burst briefly but repeatedly accumulate toward the daily limit without ever triggering the per-minute alarm.

Python
import time
import threading
from dataclasses import dataclass, field
from typing import Optional
import datetime

@dataclass
class GroqDailyBudgetGuard:
    """
    Tracks per-day token consumption independently of per-minute limits.
    Resets at midnight UTC to match Groq's billing day boundary.
    """
    tpd_limit: int               # daily token cap for your model + tier
    warning_fraction: float = 0.8  # warn when 80% of daily budget consumed

    _tokens_today: int = field(default=0, init=False, repr=False)
    _day_start_utc: str = field(default="", init=False, repr=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
    _warning_issued: bool = field(default=False, init=False, repr=False)
    _consumption_log: list = field(default_factory=list, init=False)

    def _today_utc(self) -> str:
        return datetime.datetime.utcnow().strftime("%Y-%m-%d")

    def _reset_if_new_day(self):
        today = self._today_utc()
        if today != self._day_start_utc:
            self._tokens_today = 0
            self._warning_issued = False
            self._day_start_utc = today

    def check_daily_budget(self, estimated_tokens: int) -> tuple[bool, str]:
        """Returns (ok, reason). Call before every API request."""
        with self._lock:
            self._reset_if_new_day()

            projected = self._tokens_today + estimated_tokens
            if projected > self.tpd_limit:
                utc_now = datetime.datetime.utcnow()
                seconds_until_reset = (
                    (datetime.datetime(utc_now.year, utc_now.month, utc_now.day)
                     + datetime.timedelta(days=1) - utc_now).total_seconds()
                )
                return False, (
                    f"daily token budget exhausted: {self._tokens_today} used + "
                    f"{estimated_tokens} requested > {self.tpd_limit} TPD limit. "
                    f"Budget resets in {seconds_until_reset/3600:.1f} hours (midnight UTC)."
                )

            fraction_used = projected / self.tpd_limit
            if fraction_used >= self.warning_fraction and not self._warning_issued:
                self._warning_issued = True
                print(
                    f"[GroqDailyBudgetGuard] WARNING: {fraction_used*100:.0f}% of daily "
                    f"budget consumed ({projected}/{self.tpd_limit} tokens). "
                    f"Reserve capacity before midnight UTC reset."
                )

            return True, f"{projected}/{self.tpd_limit} tokens used today"

    def record_usage(self, tokens_used: int):
        with self._lock:
            self._reset_if_new_day()
            self._tokens_today += tokens_used
            self._consumption_log.append({
                "timestamp_utc": datetime.datetime.utcnow().isoformat(),
                "tokens": tokens_used,
                "running_total": self._tokens_today,
            })

    def remaining_today(self) -> int:
        with self._lock:
            self._reset_if_new_day()
            return max(0, self.tpd_limit - self._tokens_today)

    def estimated_hours_until_exhaustion(self, tokens_per_hour: float) -> Optional[float]:
        remaining = self.remaining_today()
        if tokens_per_hour <= 0:
            return None
        return remaining / tokens_per_hour

    def report(self) -> dict:
        with self._lock:
            self._reset_if_new_day()
            return {
                "date_utc": self._day_start_utc,
                "tokens_used_today": self._tokens_today,
                "tpd_limit": self.tpd_limit,
                "fraction_used": self._tokens_today / self.tpd_limit,
                "remaining": self.remaining_today(),
            }

Failure Mode 4 — Context Accumulation Outpacing Budget Counters at LPU Speed

When developers instrument agent loops for cost control, they typically measure two things during testing: the number of tokens in the initial prompt and the average number of tokens added per tool call. If testing on a GPU-based provider shows that a typical agent run accumulates 300 tokens per step over 10 steps, they set a budget ceiling of 5,000 tokens — 3,000 for the base prompt plus 2,000 for safe headroom above the 10-step average.

Deploying the same agent on Groq changes the accumulation behavior in a subtle but significant way. On a 60-tokens/second GPU provider, each tool call takes 2–4 seconds to return a response. During that time, no new tokens are added to the context. On Groq's 400-tokens/second LPU, each tool call returns a response in 0.3–0.8 seconds. The agent's tool execution loop runs proportionally faster: a web search tool that takes 800ms to return might have a 400ms LLM overhead on GPU providers (2× the tool time) and only 50ms on Groq (0.06× the tool time). This means Groq agents run more iterations per wall-clock second, which means they accumulate more context per unit time.

The practical consequence: a token budget ceiling set by testing on GPU providers at 10 steps per minute may be hit in under 2 minutes on Groq at 50+ steps per minute — because the guard fires on token count, not on time, and the agent reaches the token ceiling after fewer steps than expected due to tool results arriving faster and more frequently. The agent is not "looping more" — it is simply executing the correct number of steps per second that Groq's speed enables, but the developer's token budget was implicitly calibrated against a slower inference rate.

Python
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class GroqContextAccumulationGuard:
    """
    Tracks context growth rate and detects accumulation patterns
    that indicate runaway growth, independent of wall-clock time.
    """
    max_context_tokens: int = 8_000
    max_context_growth_per_step: int = 500   # warn if single step adds more than this
    max_context_growth_rate: float = 1.5      # warn if context grows >50% per step

    _context_history: list = field(default_factory=list, init=False)
    _growth_warnings: list = field(default_factory=list, init=False)

    def record_context_size(self, step: int, input_tokens: int, tool_results_tokens: int):
        entry = {
            "step": step,
            "input_tokens": input_tokens,
            "tool_results_tokens": tool_results_tokens,
            "total": input_tokens,
            "timestamp": time.monotonic(),
        }
        self._context_history.append(entry)

        # Check absolute limit
        if input_tokens > self.max_context_tokens:
            print(
                f"[GroqContextAccumulationGuard] step {step}: context {input_tokens} tokens "
                f"exceeds limit {self.max_context_tokens} — agent should summarize or truncate"
            )

        # Check per-step growth
        if len(self._context_history) >= 2:
            prev = self._context_history[-2]["total"]
            curr = self._context_history[-1]["total"]
            step_growth = curr - prev
            growth_ratio = curr / prev if prev > 0 else float('inf')

            if step_growth > self.max_context_growth_per_step:
                warning = {
                    "step": step,
                    "growth_tokens": step_growth,
                    "type": "absolute_growth",
                }
                self._growth_warnings.append(warning)
                print(
                    f"[GroqContextAccumulationGuard] step {step}: context grew by {step_growth} tokens "
                    f"(limit {self.max_context_growth_per_step}) — tool result may be too large"
                )

            if growth_ratio > self.max_context_growth_rate:
                warning = {
                    "step": step,
                    "growth_ratio": growth_ratio,
                    "type": "ratio_growth",
                }
                self._growth_warnings.append(warning)
                print(
                    f"[GroqContextAccumulationGuard] step {step}: context grew {growth_ratio:.2f}× "
                    f"(limit {self.max_context_growth_rate}×) — possible runaway accumulation"
                )

    def projected_exhaustion_step(self) -> Optional[int]:
        """Estimate which step will exceed max_context_tokens given recent growth rate."""
        if len(self._context_history) < 3:
            return None
        recent = self._context_history[-3:]
        avg_growth = (recent[-1]["total"] - recent[0]["total"]) / max(len(recent) - 1, 1)
        if avg_growth <= 0:
            return None
        current = self._context_history[-1]["total"]
        remaining = self.max_context_tokens - current
        if remaining <= 0:
            return self._context_history[-1]["step"]
        return self._context_history[-1]["step"] + int(remaining / avg_growth)

    def should_summarize(self) -> bool:
        """Returns True when context is over 70% full and growing consistently."""
        if not self._context_history:
            return False
        current = self._context_history[-1]["total"]
        fill_fraction = current / self.max_context_tokens
        if fill_fraction < 0.7:
            return False
        projected = self.projected_exhaustion_step()
        if projected is None:
            return False
        steps_remaining = projected - self._context_history[-1]["step"]
        return steps_remaining <= 3  # less than 3 steps before exhaustion

    def report(self) -> dict:
        if not self._context_history:
            return {"steps": 0, "current_tokens": 0}
        current = self._context_history[-1]["total"]
        return {
            "steps": len(self._context_history),
            "current_tokens": current,
            "fill_fraction": current / self.max_context_tokens,
            "growth_warnings": len(self._growth_warnings),
            "projected_exhaustion_step": self.projected_exhaustion_step(),
            "recommend_summarize": self.should_summarize(),
        }

Composite Guard for Groq Cloud Agents

All four failure modes can interact in a single agent run. An agent that exhausts its per-minute TPM limit retries with a growing context (failure modes 1 and 4 compounding), completes that loop so quickly that the monitoring alert never fires (failure mode 2), and by doing so depletes the daily TPD budget hours before the billing day resets (failure mode 3). The GroqAgentPolicy dataclass below combines all four guards into a single policy object that covers pre-flight checks, step-synchronous loop detection, daily budget tracking, and context growth monitoring:

Python
from dataclasses import dataclass, field
from typing import Optional, Callable, Any

@dataclass
class GroqAgentPolicy:
    """
    Composite cost guard for Groq Cloud agents.
    Covers: rate limit retry cascade, speed-amplified loop, daily TPD,
    and context accumulation at LPU speed.

    Usage:
        policy = GroqAgentPolicy.for_model("llama-3.1-70b-versatile", tier="developer")
        # before each API call:
        policy.check_or_raise(input_tokens=estimated_input_tokens)
        # after each API call:
        policy.record(step=step_n, input_tokens=actual_input, output_tokens=actual_output)
    """
    model: str
    tier: str
    tpm_limit: int
    rpm_limit: int
    tpd_limit: int
    max_steps: int = 25
    max_context_tokens: int = 8_000
    max_wall_seconds: float = 300.0

    def __post_init__(self):
        self.rate_guard = GroqRateLimitGuard(
            tpm_limit=self.tpm_limit,
            rpm_limit=self.rpm_limit,
            max_context_tokens=self.max_context_tokens,
            max_retries=3,
        )
        self.loop_guard = GroqSpeedLoopGuard(
            max_steps=self.max_steps,
            max_input_tokens=self.max_context_tokens,
            max_wall_seconds=self.max_wall_seconds,
        )
        self.daily_guard = GroqDailyBudgetGuard(tpd_limit=self.tpd_limit)
        self.accumulation_guard = GroqContextAccumulationGuard(
            max_context_tokens=self.max_context_tokens,
        )
        self._step = 0

    @classmethod
    def for_model(cls, model: str, tier: str = "developer") -> "GroqAgentPolicy":
        limits = GROQ_TIER_LIMITS.get(tier, {}).get(model)
        if not limits:
            raise ValueError(f"Unknown model {model!r} for tier {tier!r}. "
                             "Check GROQ_TIER_LIMITS or verify at console.groq.com/settings/limits")
        return cls(
            model=model,
            tier=tier,
            tpm_limit=limits["tpm"],
            rpm_limit=limits["rpm"],
            tpd_limit=limits["tpd"],
        )

    def check_or_raise(self, input_tokens: int):
        """Run all pre-flight checks. Raises RuntimeError with reason if any check fails."""
        # 1. Step + time ceiling
        stop = self.loop_guard.check_before_call(input_tokens)
        if stop:
            raise RuntimeError(f"[GroqAgentPolicy:{self.model}] loop guard: {stop}")

        # 2. Daily budget
        ok, reason = self.daily_guard.check_daily_budget(input_tokens)
        if not ok:
            raise RuntimeError(f"[GroqAgentPolicy:{self.model}] daily budget: {reason}")

        # 3. Per-minute rate limit (soft check — actual enforcement on 429 response)
        ok, reason = self.rate_guard.check_preflight(input_tokens)
        if not ok:
            print(f"[GroqAgentPolicy:{self.model}] rate limit pre-flight: {reason}")
            # Don't raise — let the API call proceed; GroqRateLimitGuard.guarded_call handles retry

    def record(self, step: int, input_tokens: int, output_tokens: int):
        """Record the result of a completed API call."""
        self.loop_guard.record_step(input_tokens, output_tokens)
        self.daily_guard.record_usage(input_tokens + output_tokens)
        self.accumulation_guard.record_context_size(step, input_tokens, output_tokens)
        self.rate_guard.record_request(input_tokens + output_tokens)
        self._step += 1

        if self.accumulation_guard.should_summarize():
            print(
                f"[GroqAgentPolicy:{self.model}] step {step}: context near limit "
                f"— recommend summarizing conversation history before next call"
            )

    def report(self) -> dict:
        return {
            "model": self.model,
            "tier": self.tier,
            "loop": self.loop_guard.report(),
            "daily_budget": self.daily_guard.report(),
            "context_accumulation": self.accumulation_guard.report(),
            "rate_limit_retries": len(self.rate_guard.retry_log),
            "retry_token_amplification": self.rate_guard.retry_token_amplification(),
        }


# --- Quick start ---
# policy = GroqAgentPolicy.for_model("llama-3.1-70b-versatile", tier="developer")
#
# In your agent loop:
#   policy.check_or_raise(input_tokens=count_tokens(messages))
#   response = client.chat.completions.create(model=policy.model, messages=messages, tools=tools)
#   policy.record(step=n, input_tokens=response.usage.prompt_tokens,
#                 output_tokens=response.usage.completion_tokens)
#
# At end of run:
#   print(policy.report())

FAQ

Does Groq's Retry-After header tell me which limit was hit — TPM, RPM, or TPD?

Groq's 429 response body includes an error message that distinguishes between rate limit types. A TPM exhaustion error returns a message like "Rate limit reached for model ... on tokens per minute"; a TPD exhaustion error will indicate the daily budget. The Retry-After header gives the seconds until the window resets — for TPM this is typically under 60 seconds, while for TPD it reflects the time until midnight UTC. Parsing the error message body (not just the HTTP status) lets you distinguish a brief wait from a multi-hour outage and adjust retry behavior accordingly: for TPM exhaustion, exponential backoff; for TPD exhaustion, pause the agent and notify the operator rather than entering a futile retry loop.

Can I switch models mid-session to work around Groq rate limits (e.g., fall back to Llama 3.1 8B when 70B is rate-limited)?

You can, but model fallback within the same API key shares the same daily TPD budget. If llama-3.1-70b-versatile hits its daily limit, falling back to llama-3.1-8b-instant uses the 8B model's separate daily budget — so the fallback does extend your runway. However, the per-minute limits are per-model, so a 70B rate limit does not release tokens for 8B; they have independent TPM buckets. The key risk is quality degradation: an agent designed for 70B reasoning may produce lower-quality tool selections on 8B, triggering more tool call retries and consuming the 8B daily budget faster than a 70B run would have. Test your agent on the fallback model independently before treating it as a transparent failover.

Groq offers very fast inference, so my agent's p99 latency improved dramatically. Why does my total cost stay the same or increase compared to slower providers?

Groq's pricing is per-token, not per-second. A 10,000-token conversation costs the same whether it completes in 25 seconds (Groq) or 3 minutes (GPT-4). Total cost is determined by total token volume, not by how fast the tokens arrive. The latency improvement does not reduce cost — it only changes the time dimension of the cost. Where teams see cost increases after moving to Groq is when the speed increase allows agents to run more sessions per hour (more throughput = more opportunities to accumulate costs) or when monitoring systems that worked at GPU speeds fail to catch loops before they complete at LPU speeds. The circuit breaker patterns in this post address the speed-monitoring gap directly.

How do I integrate these guards with LangChain's ChatGroq or LlamaIndex's Groq LLM wrapper?

Both ChatGroq and the LlamaIndex Groq wrapper accept callbacks or events that fire before and after each LLM call. In LangChain, implement a custom BaseCallbackHandler with on_llm_start (for pre-flight checks via policy.check_or_raise()) and on_llm_end (to call policy.record() with the usage data from the LLMResult). In LlamaIndex, use a custom BaseCallbackHandler with CBEventType.LLM events. In both cases, the policy object lives outside the LLM wrapper and receives the pre/post-call signals via the callback interface. This keeps the guard logic decoupled from the framework-specific LLM client and allows the same GroqAgentPolicy to wrap any framework that supports pre/post-call hooks.

Is the max_context_tokens limit I should use the same as the model's advertised context window?

No — set max_context_tokens to a fraction of the model's context window, typically 60–75%. Groq's Llama 3.1 70B supports 128K tokens of context, but sending a 128K-token prompt at 6,000 TPM consumes 21× your per-minute token budget in a single request. The practical limit for rate-limit-safe operation is much lower than the technical limit. For agents on free-tier plans, a 4,000–6,000 token context ceiling keeps each request within a single TPM window. On Developer plans, 12,000–20,000 token ceilings are viable. The model's advertised context window is a technical ceiling, not a billing-safe operating limit — and on Groq, your rate limits almost always constrain you before the technical context window does.

Stop Groq loops before they exhaust your daily budget

RunGuard's circuit breaker SDK integrates with Groq's GroqCloud API to enforce step ceilings, context limits, and daily budget caps synchronously — before each LLM call, not after the damage is done. One install line for Python or TypeScript. Works with LangChain ChatGroq, LlamaIndex, direct openai SDK calls against the Groq endpoint, and any agent framework that gives you a pre-call hook.

Start free trial — no card required