Agent task decomposition cost efficiency: how breaking work into subtasks changes your LLM API bill
When an AI agent tackles a complex task, it has to decide how to break that task into steps. This decomposition decision — how many subtasks, how they are sequenced, how much context each subtask receives — affects total cost as much as model selection does. A task that decomposes into 10 sequential LLM calls, each receiving the full conversation history as input, can cost 5–10x more than the same task decomposed into 3 calls with targeted context. The cost amplifier is the input token count: each additional subtask call re-sends all prior context as input. A conversation history that grows by 500 tokens per step reaches 5,000 tokens of input by step 10, meaning step 10 pays for 5,000 input tokens even if the relevant context for that step is only 200 tokens. This guide covers four decomposition patterns with different cost profiles: flat sequential, tree (parallel subtasks), pipeline (task-specific context windows), and batch (multiple queries in one prompt). Each pattern has different failure modes for cost control, and RunGuard’s budget tracker integrates differently with each.
The four decomposition cost patterns
- Pattern 1: flat sequential (highest cost per task). The agent executes subtasks one at a time, passing the full accumulated history to each step. Cost per step grows linearly with step number because each step receives all prior outputs as input. For a 10-step task where each step generates 300 output tokens, step 10 receives ~3,000 tokens of prior history as additional input. At $3/M input tokens (Sonnet pricing), those extra 3,000 tokens cost $0.009 per step-10 call — small in isolation but significant at scale. For a 1,000-task/day volume, the accumulated-history overhead at 10 steps is ~$9/day in input token overhead alone, compared to ~$0/day if each step received only the necessary context.
- Pattern 2: tree decomposition (lower cost, higher parallelism). The planner generates a tree of subtasks, each with a self-contained context: the original task description plus the specific slice of prior work relevant to this subtask. Subtasks at the same tree depth run in parallel (or can), reducing total wall-clock time. More importantly, each leaf node receives a bounded context rather than the full accumulated history. A 3-level tree where each node has 3 children generates 9 leaf nodes, but each leaf receives at most the original task + its parent’s output as context (~600–800 tokens), not all 8 sibling leaves’ outputs. This is 4–6x cheaper in input tokens per leaf call than the flat sequential equivalent.
- Pattern 3: pipeline with task-specific context slicing (lowest cost). Each stage of the pipeline receives only the outputs from immediately prior stages that are relevant to its task. A code-generation pipeline might have stages: understand_requirements → write_tests → write_implementation → review. The review stage does not need the requirements-understanding output verbatim; it needs the tests and implementation. Slicing the context to the minimum required for each stage reduces average input token count by 40–70% compared to passing the full accumulated context at each stage, according to benchmarks on document-processing agents.
- Pattern 4: query batching (highest throughput efficiency). When multiple independent subtasks can be answered from the same base context, batch them into a single prompt. An agent that needs to extract 10 fields from a document can make 10 separate extraction calls or 1 batch extraction call requesting all 10 fields at once. The batched call pays the document’s input tokens once; the 10 separate calls pay them 10 times. For a 20,000-token document at $3/M input tokens, the 10 separate calls cost $0.60 in input token overhead vs $0.06 for the single batched call — a 10x cost difference on the input side alone.
Python: cost-efficient pipeline decomposition with per-subtask budget caps
-
Python: pipeline agent with context slicing and RunGuard budget enforcement
import anthropic from runguard import guard, BudgetExceededError from dataclasses import dataclass from typing import Any, Optional client = anthropic.Anthropic() # Per-step budget caps: allocate your total task budget across steps # Step allocations should reflect expected complexity, not equal splits. @dataclass class PipelineStep: name: str system_prompt: str budget_usd: float # hard cap for this step context_keys: list[str] # which prior step outputs to include as context PIPELINE = [ PipelineStep( name="understand_requirements", system_prompt="Extract structured requirements from the user's request. Output JSON.", budget_usd=0.05, context_keys=[], # no prior context needed — just the original request ), PipelineStep( name="write_tests", system_prompt="Write unit tests for the requirements. Focus on edge cases.", budget_usd=0.15, context_keys=["understand_requirements"], # only requirements, not original request verbatim ), PipelineStep( name="write_implementation", system_prompt="Implement code that passes the provided tests.", budget_usd=0.20, context_keys=["understand_requirements", "write_tests"], ), PipelineStep( name="review", system_prompt="Review the implementation for correctness and completeness against the tests.", budget_usd=0.10, context_keys=["write_tests", "write_implementation"], # NOT requirements — irrelevant to review ), ] def build_step_messages( original_request: str, step: PipelineStep, prior_outputs: dict[str, str], ) -> list[dict]: """Build a message list with only the context slices this step needs.""" content_parts = [f"Original request:\n{original_request}"] for key in step.context_keys: if key in prior_outputs: content_parts.append(f"\n{key.replace('_', ' ').title()} output:\n{prior_outputs[key]}") return [{"role": "user", "content": "\n\n".join(content_parts)}] def run_pipeline(user_request: str) -> dict[str, Any]: """Run all pipeline steps, respecting per-step budget caps.""" prior_outputs: dict[str, str] = {} total_spent = 0.0 for step in PIPELINE: messages = build_step_messages(user_request, step, prior_outputs) # Create a fresh guard per step with the step's budget cap @guard(budget_usd=step.budget_usd, loop_max_repeats=3) def _run_step(msgs: list, sys: str) -> str: resp = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=1024, system=sys, messages=msgs, ) return resp.content[0].text try: output = _run_step(messages, step.system_prompt) prior_outputs[step.name] = output print(f" [pipeline] {step.name}: OK ({len(output)} chars)") except BudgetExceededError as e: print(f" [pipeline] {step.name}: BUDGET EXCEEDED — {e}") # Abort pipeline; return partial results return { "status": "budget_exceeded", "failed_step": step.name, "partial_outputs": prior_outputs, } return {"status": "ok", "outputs": prior_outputs} # Example: batch extraction to avoid re-sending the same large context 10 times def batch_extract_fields(document: str, fields: list[str]) -> dict[str, str]: """Extract multiple fields in one call instead of N separate calls.""" field_list = "\n".join(f"- {f}" for f in fields) prompt = f"""Extract the following fields from the document. Return as JSON. Fields to extract: {field_list} Document: {document}""" @guard(budget_usd=0.20, loop_max_repeats=3) def _extract(p: str) -> str: resp = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=512, messages=[{"role": "user", "content": p}], ) return resp.content[0].text return _extract(prompt) # one call, one context-load, all fields -
The key cost insight is context_keys. Each step declares exactly which prior outputs it needs as input context. The
reviewstep listswrite_testsandwrite_implementationbut notunderstand_requirements— the requirements are already embedded in the tests and implementation, and re-sending them to the review step adds input tokens without adding information. This discipline of context slicing is the single highest-ROI cost optimization available in pipeline agents.
Decomposition pattern cost comparison for a 10-subtask agent
| Pattern | Average input tokens per call | Total input cost (10 calls, 500 base tokens) | Parallelism | Best for |
|---|---|---|---|---|
| Flat sequential (accumulated history) | ~3,000 (grows with each step) | $0.068 at $3/MTok | None | Short tasks (<5 steps) where coherence matters |
| Tree decomposition | ~800 (parent context only) | $0.024 at $3/MTok | High (sibling nodes in parallel) | Tasks that decompose into independent subtasks |
| Pipeline with context slicing | ~600 (relevant prior outputs only) | $0.018 at $3/MTok | Low (sequential stages) | Tasks with data-dependency chains (code, analysis) |
| Batched single call | 500 (base only, one call) | $0.0015 at $3/MTok (1 call) | Maximum (1 API call) | Independent field extraction from same document |
For parallel task execution cost patterns, see AI agent parallel tool call budget control. For per-session cost tracking, see AI agent cost per user session.
Enforce per-subtask budgets in your decomposed agent
RunGuard’s guard() wrapper accepts a budget_usd parameter that creates an independent budget tracker for each guard instance. In a pipeline or tree decomposition, create one guard per step with an appropriate budget cap. Steps that exceed their allocation raise BudgetExceededError immediately, letting you abort the pipeline and return partial results rather than running every subsequent step on a stale or invalid prior output.
RunGuard pricing: Solo plan at $19/month for individual developers. Team plan at $79/month adds Slack and PagerDuty webhook alerts, shared dashboards, and audit log. Both plans include a 14-day free trial — no credit card required.
Start your 14-day free trial — or explore related: multi-agent orchestration cost control, parallel tool call budget control, autonomous agent cost control best practices, cost per user session, and A/B testing cost tradeoffs.