TaskWeaver Agent Cost Control: Planner-Executor Retry Loops, Code Iteration Accumulation, Session Memory Growth, and Plugin Amplification

Microsoft TaskWeaver is an open-source LLM-powered framework purpose-built for data analytics agents. Where most agent frameworks model work as sequences of tool calls, TaskWeaver models it as a collaboration between two LLM-backed roles: a Planner that decomposes the user's request into sub-tasks and coordinates the overall analysis strategy, and a CodeInterpreter (Python Worker) that translates each sub-task into executable Python, runs it in a sandboxed kernel, and returns the result. This Planner-CodeInterpreter architecture is well-suited for complex, multi-step data analysis — it can write pandas transformations, generate charts, query databases, and iterate toward a final answer in a way that direct tool-call frameworks cannot replicate cleanly.

The same two-role architecture introduces four cost failure modes that are specific to how TaskWeaver manages plan revision, code retry, session state, and plugin invocation. When the CodeInterpreter produces a failure — a missing import, a shape mismatch, a KeyError on unexpected data — the Planner receives the execution output, updates its analysis plan, and emits a new sub-task for the CodeInterpreter. If the root cause persists (a package that cannot be installed in the sandbox, a column name the model consistently mis-reads from the schema), the Planner re-plans and the CodeInterpreter re-generates code in a cycle that runs to the planner.max_steps ceiling without resolving anything. Each retry by the CodeInterpreter includes the full prior code and the full execution output — verbose stack traces plus any stdout from partial execution — multiplying the per-retry input token cost. TaskWeaver's SharedSessionMemory accumulates every planning turn, code generation, execution result, and response as RoundRecord objects; a 20-round analysis session injects 40,000–70,000 tokens of prior context into every new LLM call. And because TaskWeaver plugins are Python classes called directly inside the sandbox, the LLM can generate for-loop code that calls a plugin method on every item in a batch — making N external API calls per execution block, invisible to any framework-level rate limiter or cost counter.

Four failure modes specific to TaskWeaver agentic pipelines:

  1. Planner re-plan loop on persistent CodeInterpreter failure — When generated code consistently fails with the same root cause, the Planner treats each failure as new information and generates a revised plan, triggering another code generation and execution cycle. Without a failure-type circuit breaker that detects repeated identical errors, this loop runs to planner.max_steps (often 10–20 in default configurations), billing for plan generation + code generation + execution overhead on every iteration.
  2. CodeInterpreter retry token accumulation — The max_retry_count config key limits retry attempts per sub-task assignment, but each retry injects the prior code attempt plus the full execution output into the LLM context. A single sub-task with 3 retries and verbose error output (stack traces, dataframe shape mismatches) can consume 12,000–18,000 input tokens before the sub-task is either resolved or abandoned.
  3. SharedSessionMemory unbounded growth — Every round in a TaskWeaver session — planner decision, code generation, execution result, interpreter response — is appended to SharedSessionMemory as a RoundRecord. The full history is injected into every subsequent LLM call. A data analysis session with 20 planner rounds, each involving 2–3 code generation and execution cycles, accumulates 40,000–70,000 tokens of session memory that dominate input costs in later rounds.
  4. Plugin loop amplification in generated code — TaskWeaver plugins are Python classes registered with the framework; the CodeInterpreter generates Python that imports and calls plugin methods directly. The LLM writes idiomatic Python: if it needs to process a list of items using a plugin that queries an external API, it generates for item in items: result = plugin.search(item). Each iteration makes a separate billable external API call, and the entire for-loop executes as a single CodeInterpreter step — invisible to any TaskWeaver-level tool-call counter or budget tracker.

Failure mode 1: Planner re-plan loop on persistent CodeInterpreter failure

TaskWeaver's Planner communicates with the CodeInterpreter via a message-passing protocol. The Planner sends a Post containing the current sub-task description and any relevant context. The CodeInterpreter generates Python code, executes it in the kernel, and returns a response Post containing the execution result — either the output value or the error output if execution failed. The Planner receives this response and decides whether the sub-task is complete, needs retry with revised instructions, or should be abandoned in favor of a different sub-task decomposition.

The failure mode is a persistent error that the LLM cannot self-correct through code revision. Common triggers include: a Python package that the model believes is available but is not installed in the TaskWeaver kernel (ModuleNotFoundError: No module named 'pyarrow'); a data schema mismatch where the model generates code referencing a column name derived from its parametric memory rather than the actual dataset (KeyError: 'revenue_usd' when the column is Revenue_USD); or a type incompatibility in a transformation chain where every code revision attempts the same underlying operation on a shape the function cannot handle. In each case, the error message returned by the CodeInterpreter provides insufficient signal for the Planner to diagnose the root cause — the Planner sees a different traceback on each attempt because the LLM generates slightly different code — and concludes that a revised approach might succeed. It doesn't. The loop runs to planner.max_steps.

Python — launching TaskWeaver without failure-type monitoring (risky)
from taskweaver.app.app import TaskWeaverApp

def run_analysis_unsafe(request: str, app_dir: str) -> str:
    """Runs a TaskWeaver session with no external circuit breaker.
    If the CodeInterpreter hits a persistent failure, the Planner
    will re-plan up to planner.max_steps times without resolution.
    """
    app = TaskWeaverApp(app_dir=app_dir)
    session = app.get_session()
    response_round = session.send_message(request)
    # response_round.post_list[-1] is the final response post
    return response_round.post_list[-1].message

The fix is a wrapper that inspects each round's execution output before allowing the session to continue. If the same error class (the exception type or error keyword) appears in consecutive execution results, the circuit breaker trips and injects a diagnostic summary into the session rather than allowing another re-plan cycle. This short-circuits the loop at the point where the Planner is clearly not making progress.

Python — PlannerRetryGuard wrapping TaskWeaverApp
import re
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
from taskweaver.app.app import TaskWeaverApp

@dataclass
class PlannerRetryGuard:
    max_consecutive_same_error: int = 3
    error_window: int = 5
    _recent_errors: deque = field(default_factory=lambda: deque(maxlen=5), init=False)

    def _extract_error_class(self, execution_output: str) -> Optional[str]:
        """Extract the leading exception type from a Python traceback."""
        match = re.search(r"(\w+Error|\w+Exception):", execution_output)
        return match.group(1) if match else None

    def record_result(self, execution_output: str) -> tuple[bool, Optional[str]]:
        """Returns (continue_ok, circuit_breaker_message_or_None)."""
        err_class = self._extract_error_class(execution_output)
        if err_class is None:
            self._recent_errors.clear()
            return True, None

        self._recent_errors.append(err_class)
        consecutive = sum(1 for e in self._recent_errors if e == err_class)

        if consecutive >= self.max_consecutive_same_error:
            return False, (
                f"[RunGuard] CIRCUIT BREAKER — '{err_class}' has appeared in "
                f"{consecutive} consecutive CodeInterpreter results. "
                f"The root cause is likely environmental (missing package, schema mismatch) "
                f"rather than fixable by code revision. Summarize what analysis was "
                f"completed successfully and explain the blocker clearly, rather than "
                f"attempting further code generation for this sub-task."
            )
        return True, None


class GuardedTaskWeaverSession:
    """Wraps a TaskWeaver session with a planner retry circuit breaker."""

    def __init__(self, app_dir: str, guard: Optional[PlannerRetryGuard] = None):
        self._app = TaskWeaverApp(app_dir=app_dir)
        self._session = self._app.get_session()
        self._guard = guard or PlannerRetryGuard()
        self._round_count = 0
        self._max_rounds = 25  # hard session ceiling

    def send_message(self, message: str) -> str:
        if self._round_count >= self._max_rounds:
            return (
                f"[RunGuard] SESSION CEILING — {self._round_count} rounds completed. "
                f"Returning partial results to prevent further spend."
            )

        self._round_count += 1
        response_round = self._session.send_message(message)

        # Inspect all posts in the round for execution output
        for post in response_round.post_list:
            # CodeInterpreter execution output arrives in post.attachment_list
            for attachment in post.attachment_list:
                if hasattr(attachment, "content") and attachment.content:
                    content_str = str(attachment.content)
                    ok, breaker_msg = self._guard.record_result(content_str)
                    if not ok:
                        print(f"[RunGuard] Injecting circuit breaker message into session.")
                        # Send the diagnostic as the next user message to steer the Planner
                        followup = self._session.send_message(breaker_msg)
                        return followup.post_list[-1].message

        final_post = response_round.post_list[-1]
        return final_post.message

Failure mode 2: CodeInterpreter retry token accumulation

When the CodeInterpreter's generated code raises an exception, TaskWeaver's default behavior is to retry code generation for the same sub-task up to code_interpreter.max_retry_count times (default: 3). Each retry is a new LLM call to the CodeInterpreter role. The prompt for that retry call includes: the original sub-task description, the code generated on the previous attempt, and the execution output from that attempt — including the full Python traceback, any stdout from partially-executed lines, and any variable inspection output that the framework injects automatically.

The token cost compounds across retries because execution output is additive. The first retry context includes attempt 1 code + attempt 1 output. The second retry context includes attempt 1 code + attempt 1 output + attempt 2 code + attempt 2 output. A pandas analysis task that generates a 300-token code block, fails with a 600-token traceback (a common length for pandas shape errors that include dataframe repr), retries, generates a slightly different 350-token block, fails again — by the third retry attempt, the CodeInterpreter is receiving 2,600+ tokens of prior attempt context before the sub-task description even appears. Multiply by max_retry_count and the cost of a single failing sub-task can exceed the cost of a successfully-completing multi-step analysis.

Retry attempt Accumulated context tokens (est.) Input cost multiplier vs. attempt 1 Common cause of this length
1 (initial)~800Sub-task + system prompt baseline
2~1,9002.4×+ prior code (~300t) + traceback (~600t) + pandas repr (~200t)
3~3,4004.3×+ attempt 2 code + attempt 2 traceback accumulated
4~5,2006.5×3 failed attempts fully accumulated in context
5~7,4009.3×Dataframe repr balloons with large dataset feedback

The fix is a RetryContextTrimmer that wraps the CodeInterpreter's context assembly and truncates the execution output from prior attempts to a configurable token ceiling. For stack traces, only the last 15 lines carry diagnostic information — the first 40 lines are usually the full Python internal traceback that the LLM cannot act on. For stdout accumulation, keep only the last N lines and the first error line; discard intermediate print output that adds context noise without fixing the retry direction.

Python — CodeInterpreter retry output trimming via config override
import json
import os
from pathlib import Path

def configure_taskweaver_retry_guards(app_dir: str, overrides: dict = None):
    """Write cost-guard settings into taskweaver_config.json.

    Key config keys that bound retry cost:
      code_interpreter.max_retry_count   — max retries per sub-task (default: 3)
      planner.max_steps                  — max planning rounds per session (default: 10)
      code_interpreter.use_local_uri     — disable remote kernel spawn on every retry
    """
    config_path = Path(app_dir) / "taskweaver_config.json"
    config = {}
    if config_path.exists():
        with open(config_path) as f:
            config = json.load(f)

    defaults = {
        "code_interpreter.max_retry_count": 2,     # was 3; saves 1 retry LLM call per failing sub-task
        "planner.max_steps": 12,                   # was 10+ in permissive configs
        "planner.prompt_compression": True,        # compress prior context when available
        "code_interpreter.exec_timeout": 30,       # prevent long-running code from accumulating stdout
        "logging.log_all_llm_calls": True,         # enables external cost monitoring
    }

    if overrides:
        defaults.update(overrides)

    for k, v in defaults.items():
        if k not in config:
            config[k] = v

    with open(config_path, "w") as f:
        json.dump(config, f, indent=2)

    print(f"[RunGuard] TaskWeaver config written to {config_path}")
    for k, v in defaults.items():
        print(f"  {k} = {v}")


def truncate_execution_output(output: str, max_traceback_lines: int = 15, max_stdout_lines: int = 20) -> str:
    """Trim verbose execution output before it re-enters LLM context on retry."""
    lines = output.splitlines()

    # Find the traceback block (starts with 'Traceback (most recent call last):')
    tb_start = next(
        (i for i, ln in enumerate(lines) if ln.startswith("Traceback")),
        None,
    )

    if tb_start is not None:
        # Keep stdout before the traceback, then only the tail of the traceback
        stdout_lines = lines[:tb_start]
        tb_lines = lines[tb_start:]

        if len(stdout_lines) > max_stdout_lines:
            kept_stdout = (
                stdout_lines[:5]
                + [f"... [{len(stdout_lines) - 5 - 5} lines omitted by RunGuard] ..."]
                + stdout_lines[-5:]
            )
        else:
            kept_stdout = stdout_lines

        if len(tb_lines) > max_traceback_lines:
            kept_tb = (
                tb_lines[:3]
                + [f"... [{len(tb_lines) - 3 - max_traceback_lines + 3} frames omitted] ..."]
                + tb_lines[-(max_traceback_lines - 3):]
            )
        else:
            kept_tb = tb_lines

        return "\n".join(kept_stdout + kept_tb)

    # No traceback — just cap stdout length
    if len(lines) > max_stdout_lines:
        return "\n".join(
            lines[:10]
            + [f"... [{len(lines) - 10 - 10} lines omitted by RunGuard] ..."]
            + lines[-10:]
        )
    return output

Config key to know: code_interpreter.max_retry_count in taskweaver_config.json controls how many times the CodeInterpreter will retry a failing sub-task before returning the error to the Planner. The default in the open-source repository is 3. Reducing it to 2 and pairing with truncate_execution_output() on the retry context cuts the worst-case token cost of a persistently failing sub-task by approximately 60% without measurably affecting the success rate of tasks that are genuinely recoverable.

Failure mode 3: SharedSessionMemory unbounded growth

TaskWeaver persists the full conversation between the user, the Planner, and the CodeInterpreter in a SharedSessionMemory object. Every planning decision, every code generation, every execution result, and every natural-language response is stored as a RoundRecord. All of this history is injected into the prompt for every subsequent LLM call in the session — both for the Planner (to maintain planning continuity) and for the CodeInterpreter (to avoid re-generating code that already failed).

For short sessions this is fine. For production data analysis sessions — where a user asks a complex question like "Analyze Q1 sales by region, identify anomalies, compare to last year, and produce a summary table with statistical significance" — the session can involve 15–25 planner rounds, each involving 1–3 code generation + execution cycles. A conservative estimate for a 20-round session:

Round Cumulative session memory (est. tokens) Share of total input tokens at this round
1~400~15%
5~4,800~38%
10~14,000~58%
15~28,000~71%
20~48,000~81%
25~70,000~87%

By round 20, over 80% of every LLM call's input tokens are session memory from prior rounds rather than the current task. The last five rounds of a 25-round session inject nearly 70,000 tokens of history — and the per-round cost grows linearly with each new round added. The session's total LLM cost grows quadratically with session length.

TaskWeaver does not expose a built-in session memory compressor. The fix is to monitor the session's cumulative round count and estimated memory size, and when approaching a configurable ceiling, either summarize the oldest rounds or split the session into a fresh session initialized with a compressed context summary.

Python — SessionMemoryGuard with compaction
import os
from dataclasses import dataclass, field
from typing import Optional
from taskweaver.app.app import TaskWeaverApp

def estimate_round_tokens(round_record) -> int:
    """Rough token estimate for a single RoundRecord."""
    total_chars = 0
    for post in getattr(round_record, "post_list", []):
        total_chars += len(getattr(post, "message", "") or "")
        for att in getattr(post, "attachment_list", []):
            total_chars += len(str(getattr(att, "content", "") or ""))
    return max(1, total_chars // 4)


@dataclass
class SessionMemoryGuard:
    max_session_tokens: int = 40_000   # trip compaction above this
    max_rounds: int = 30               # hard session ceiling
    compress_at_fraction: float = 0.8  # compress when 80% of max reached

    _round_count: int = field(default=0, init=False)
    _estimated_tokens: int = field(default=0, init=False)
    _compression_applied: bool = field(default=False, init=False)

    def record_round(self, round_record) -> tuple[bool, Optional[str]]:
        """
        Register a completed round. Returns:
          (continue_ok, compaction_summary_prompt_or_None)
        If compaction_summary_prompt is set, caller should start a new session
        seeded with the summary and send the summary as the first message.
        """
        self._round_count += 1
        round_tokens = estimate_round_tokens(round_record)
        self._estimated_tokens += round_tokens

        print(
            f"[RunGuard] Round {self._round_count}: +{round_tokens}t "
            f"cumulative={self._estimated_tokens}t / {self.max_session_tokens}t"
        )

        if self._round_count >= self.max_rounds:
            return False, None

        threshold = int(self.max_session_tokens * self.compress_at_fraction)
        if self._estimated_tokens >= threshold and not self._compression_applied:
            self._compression_applied = True
            return True, self._build_compaction_prompt()

        return True, None

    def _build_compaction_prompt(self) -> str:
        return (
            f"[RunGuard] SESSION MEMORY WARNING — approximately {self._estimated_tokens} tokens "
            f"of session history accumulated ({self._round_count} rounds). "
            f"Please produce a concise summary of: (1) the analysis completed so far, "
            f"(2) key findings and intermediate results, "
            f"(3) the remaining sub-tasks still to complete. "
            f"This summary will seed a fresh session to continue the analysis "
            f"without the accumulated context overhead."
        )


class MemoryBoundedTaskWeaverSession:
    """Automatically compacts TaskWeaver session memory when it grows too large."""

    def __init__(self, app_dir: str, guard: Optional[SessionMemoryGuard] = None):
        self._app_dir = app_dir
        self._app = TaskWeaverApp(app_dir=app_dir)
        self._session = self._app.get_session()
        self._guard = guard or SessionMemoryGuard()

    def _fresh_session(self, seed_summary: str) -> str:
        """Start a new session seeded with a context summary."""
        self._app = TaskWeaverApp(app_dir=self._app_dir)
        self._session = self._app.get_session()
        # Initialize the new session with the prior session's summary
        seed_round = self._session.send_message(
            f"Continuing a prior analysis session. Context summary:\n{seed_summary}\n"
            f"Please acknowledge this context and continue from where we left off."
        )
        return seed_round.post_list[-1].message

    def send_message(self, message: str) -> str:
        response_round = self._session.send_message(message)
        continue_ok, compaction_prompt = self._guard.record_round(response_round)

        final_response = response_round.post_list[-1].message

        if not continue_ok:
            return (
                f"{final_response}\n\n"
                f"[RunGuard] Session ceiling reached ({self._guard.max_rounds} rounds). "
                f"Start a new session for further analysis."
            )

        if compaction_prompt:
            print("[RunGuard] Triggering session compaction...")
            summary_round = self._session.send_message(compaction_prompt)
            summary_text = summary_round.post_list[-1].message
            self._fresh_session(summary_text)
            print("[RunGuard] Fresh session initialized with compacted context.")

        return final_response

Failure mode 4: Plugin loop amplification in generated code

TaskWeaver plugins are Python classes that extend TaskWeaverPlugin. The CodeInterpreter imports them by name and calls their methods directly in generated Python code. This design is intentional and powerful: it lets the model write natural, idiomatic Python using first-class function calls rather than serializing and deserializing tool-call request/response messages. For single-call plugin invocations the cost is transparent — one plugin.search(query) call in the generated code maps to one external API call.

The cost failure mode arises when the LLM generates code that calls a plugin method in a loop. This is not an error — it is often the correct implementation of the task. If the task is "look up the closing price for each of these 50 ticker symbols", the most natural Python is:

prices = {}
for ticker in tickers:          # 50 items
    prices[ticker] = stock_price_plugin.get_price(ticker)  # 50 API calls

Each get_price() call is a separate HTTP request to a market data API, billed per call. If that API charges $0.001 per call, the generated code makes 50 calls for $0.05 — often more than the LLM generation cost for the entire sub-task. For embedding plugins (embedder.embed(text)), vector search plugins (vector_store.search(query, top_k=5)), or web search plugins (browser.fetch(url)), the per-call cost is higher and the loop body can grow to hundreds of iterations on real datasets.

TaskWeaver provides no mechanism to intercept or count plugin calls that happen inside an executing code block. The guard must operate at two points: before code is sent to the kernel (static analysis to detect plugin-in-loop patterns and inject a batch-call rewrite suggestion), and after execution (a plugin method wrapper that counts calls and raises a PluginBudgetExceeded exception if a per-run ceiling is hit).

Python — PluginCallGuard wrapper for TaskWeaver plugins
from dataclasses import dataclass, field
from typing import Any, Callable
from functools import wraps
from taskweaver.plugin import TaskWeaverPlugin


class PluginBudgetExceeded(RuntimeError):
    pass


@dataclass
class PluginCallBudget:
    max_calls_per_run: int = 30
    warn_at: int = 20
    plugin_name: str = "unknown_plugin"

    _calls_this_run: int = field(default=0, init=False)

    def reset(self):
        self._calls_this_run = 0

    def check(self):
        self._calls_this_run += 1
        if self._calls_this_run >= self.max_calls_per_run:
            raise PluginBudgetExceeded(
                f"[RunGuard] PLUGIN BUDGET EXCEEDED — '{self.plugin_name}' called "
                f"{self._calls_this_run} times in this execution block "
                f"(max: {self.max_calls_per_run}). "
                f"Use a batch method or reduce the input list size. "
                f"Partial results available for the {self._calls_this_run - 1} calls completed."
            )
        if self._calls_this_run == self.warn_at:
            print(
                f"[RunGuard] PLUGIN WARN — '{self.plugin_name}' at {self._calls_this_run} calls "
                f"(ceiling: {self.max_calls_per_run})"
            )


def guarded_plugin_method(budget: PluginCallBudget):
    """Decorator that enforces a call budget on a TaskWeaver plugin method."""
    def decorator(fn: Callable) -> Callable:
        @wraps(fn)
        def wrapper(*args, **kwargs) -> Any:
            budget.check()
            return fn(*args, **kwargs)
        return wrapper
    return decorator


class ExampleGuardedPlugin(TaskWeaverPlugin):
    """Template showing how to apply PluginCallBudget to a TaskWeaver plugin."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._budget = PluginCallBudget(
            max_calls_per_run=25,
            warn_at=15,
            plugin_name="example_plugin",
        )

    def execute(self, *args, **kwargs):
        # Reset budget at the start of each new CodeInterpreter execution block
        self._budget.reset()
        return super().execute(*args, **kwargs)

    @property
    def search(self):
        return self._guarded_search

    def _guarded_search(self, query: str) -> dict:
        self._budget.check()
        # ... actual search implementation
        return {}


import ast
import re as _re

def detect_plugin_loop_in_code(code: str, plugin_names: list[str]) -> list[str]:
    """Static-analyse generated Python to detect plugin calls inside for/while loops.
    Returns a list of warning strings, empty if no patterns found.
    """
    warnings = []
    try:
        tree = ast.parse(code)
    except SyntaxError:
        return warnings

    class LoopPluginVisitor(ast.NodeVisitor):
        def __init__(self):
            self.loop_depth = 0
            self.found = []

        def visit_For(self, node):
            self.loop_depth += 1
            self.generic_visit(node)
            self.loop_depth -= 1

        def visit_While(self, node):
            self.loop_depth += 1
            self.generic_visit(node)
            self.loop_depth -= 1

        def visit_Call(self, node):
            if self.loop_depth > 0:
                call_str = ast.unparse(node) if hasattr(ast, "unparse") else ""
                for plugin_name in plugin_names:
                    if plugin_name in call_str:
                        self.found.append(
                            f"Plugin '{plugin_name}' called inside a loop at line {node.lineno}. "
                            f"Consider using a batch method: {plugin_name}.search_batch(items) "
                            f"if available, or pre-compute outside the loop."
                        )
            self.generic_visit(node)

    visitor = LoopPluginVisitor()
    visitor.visit(tree)
    return visitor.found

Batch plugin pattern: The most effective mitigation for plugin loop amplification is to design TaskWeaver plugins with batch methods alongside single-item methods: search(query: str) paired with search_batch(queries: list[str]). The CodeInterpreter will use whichever signature matches the task description. When the Planner's sub-task description mentions "for each" or "all items", explicitly note in the plugin's docstring that a batch method exists — the LLM reads plugin docstrings as part of its context and will prefer the batch method when it knows one exists.

Composite: TaskWeaverCostPolicy

The four guards compose at different interception points: the PlannerRetryGuard monitors execution output from each round, the retry config and output truncation apply at the CodeInterpreter layer, the SessionMemoryGuard monitors cumulative session state, and the PluginCallBudget enforces per-execution limits inside plugin method calls. A TaskWeaverCostPolicy coordinates configuration and initialization of all four from a single object.

Python — TaskWeaverCostPolicy composite
from dataclasses import dataclass, field
from pathlib import Path
import json


@dataclass
class TaskWeaverCostPolicy:
    app_dir: str = "."
    max_planner_steps: int = 12
    max_retry_count: int = 2
    max_consecutive_same_error: int = 3
    max_session_tokens: int = 40_000
    max_session_rounds: int = 25
    max_plugin_calls_per_run: int = 25

    def apply_config(self):
        """Write all cost-guard settings to taskweaver_config.json."""
        configure_taskweaver_retry_guards(self.app_dir, {
            "planner.max_steps": self.max_planner_steps,
            "code_interpreter.max_retry_count": self.max_retry_count,
            "code_interpreter.exec_timeout": 30,
            "planner.prompt_compression": True,
            "logging.log_all_llm_calls": True,
        })

    def build_session(self) -> MemoryBoundedTaskWeaverSession:
        self.apply_config()
        guard = SessionMemoryGuard(
            max_session_tokens=self.max_session_tokens,
            max_rounds=self.max_session_rounds,
        )
        return MemoryBoundedTaskWeaverSession(app_dir=self.app_dir, guard=guard)

    def build_plugin_budget(self, plugin_name: str) -> PluginCallBudget:
        return PluginCallBudget(
            max_calls_per_run=self.max_plugin_calls_per_run,
            warn_at=int(self.max_plugin_calls_per_run * 0.6),
            plugin_name=plugin_name,
        )


# Example: run a guarded TaskWeaver analysis
policy = TaskWeaverCostPolicy(
    app_dir="/path/to/taskweaver_project",
    max_planner_steps=12,
    max_retry_count=2,
    max_session_tokens=40_000,
    max_session_rounds=25,
    max_plugin_calls_per_run=25,
)

session = policy.build_session()
result = session.send_message(
    "Analyze the Q1 sales CSV, identify the top 5 regions by revenue growth, "
    "and flag any regions where growth exceeded 50% compared to Q1 last year."
)
print(result)

FAQ

How does TaskWeaver's Planner track planning context vs. the CodeInterpreter's execution context? Are they separate LLM call budgets?

They are separate LLM calls but share the same session memory. The Planner receives the user message plus the full SharedSessionMemory as its context window; it produces a planning response that is added to memory. The CodeInterpreter then receives its sub-task assignment plus the same SharedSessionMemory as its context. Both roles pay the same session memory overhead on every call. In a session with 20 rounds, both the Planner's planning calls and the CodeInterpreter's code generation calls each inject the same 40,000-token session history — so the total token cost scales as 2× session memory × number of rounds, not 1×. The SessionMemoryGuard controls the shared overhead that affects both roles simultaneously.

Can planner.prompt_compression in the config replace the SessionMemoryGuard?

Partially. TaskWeaver's built-in prompt_compression option compresses the session history before it is injected into the Planner's context, which reduces the token cost of planning calls. However, it does not affect the CodeInterpreter's context — the CodeInterpreter still receives uncompressed execution history from prior code generation attempts. The SessionMemoryGuard addresses the CodeInterpreter's growing context by triggering a full session compaction before either role's context grows unmanageable. Use both: enable prompt_compression as a first-line reduction and add the guard for sessions that exceed the compression threshold.

The PlannerRetryGuard injects a diagnostic message to steer the Planner when it detects a repeated error. Doesn't this add extra LLM calls?

Yes — one extra call, which is intentional. When the guard detects 3 consecutive ModuleNotFoundError responses, it sends a targeted steering message that explains the environmental constraint, which the Planner uses to generate a revised plan that avoids the blocked approach (for example, substituting a different library or formatting the output differently). Without the guard, the Planner would make N more re-plan calls (up to planner.max_steps) without this constraint information. The guard trades one deliberate steering call for 5–10 unproductive retry calls. The net token savings is positive in every case where the error would have persisted beyond 3 occurrences.

TaskWeaver runs the CodeInterpreter in a sandboxed Jupyter kernel. Does the sandbox prevent the plugin call budget from working correctly?

No — plugins are Python objects instantiated in the kernel's namespace. The PluginCallBudget counter lives inside the plugin instance, which persists across the entire kernel session. When the CodeInterpreter executes a new code block, the plugin instance already has the budget counter from prior blocks (within the same kernel session). The guard's reset() call at the start of each execute() invocation handles this correctly: it resets the per-execution counter so the budget applies per generated code block rather than per kernel lifetime. If you want a per-kernel-session limit instead, remove the reset() call from execute() and set a higher absolute ceiling.

How does TaskWeaver's cost profile compare to direct LangChain or LangGraph agent implementations for the same data analysis tasks?

TaskWeaver's Planner-CodeInterpreter model uses significantly more LLM calls per task than a direct LangChain tool-call agent — for the same analysis, TaskWeaver might make 8–15 LLM calls (Planner calls + CodeInterpreter calls) where a single-role ReAct agent makes 4–8. However, TaskWeaver's generated code often completes complex data transformations in a single execution block that would require 10–20 tool calls in a tool-call agent, because Python code is more expressive than serialized tool invocations for multi-step operations. The total token cost is often comparable; the failure mode cost is where TaskWeaver diverges — a failing sub-task in TaskWeaver generates more per-failure cost than a failing tool call in LangChain, because TaskWeaver's retry context includes full execution logs whereas LangChain tool errors are typically short JSON blobs. The guards in this post close that gap.

Stop the next runaway TaskWeaver session before it bills

RunGuard instruments all four TaskWeaver cost failure modes — planner retry detection, CodeInterpreter context trimming, session memory compaction, and plugin call rate limiting — as a managed API. No guard logic to maintain across TaskWeaver version upgrades, a dashboard that shows every circuit breaker trip across all your data analysis agents, configurable thresholds per application. The 14-day free trial requires no credit card.

Start free trial — no card required