Coze is ByteDance's AI agent platform for building bots, automating multi-step workflows, and deploying agents to messaging channels and the Coze API. At its simplest layer — a single-turn chatbot answering questions — Coze is predictable: one user message, one model response, one billing event. But Coze's four more powerful capabilities operate across multiple model calls, multiple tool invocations, and multiple agent handoffs: the workflow builder, the plugin system, long-term memory, and multi-agent team orchestration.

Each of these capabilities is designed to let you build agents that do more than a single model call allows. None of them expose a per-session or per-workflow spend ceiling to your code. Coze routes every LLM call through your configured model — Doubao, GPT-4o, Claude, or another provider via your own API key — and charges per token at provider rates. The platform does not decide when enough tokens have been spent; your workflow, your plugins, and your agent team will keep calling the model until they reach natural completion or an unhandled error.

This post covers four structural cost amplification patterns specific to Coze's architecture, and a runtime circuit breaker guard for each. The guards instrument Coze's API surface without replacing it — you keep the workflow automation and multi-agent orchestration; you add the spend ceilings.

What this post covers: Four cost amplification patterns specific to Coze's workflow engine, plugin system, long-term memory, and multi-agent team features — and a runtime circuit breaker guard for each. Guards work via the Coze API (POST /v1/workflow/run, POST /v1/chat) and observable state from Coze's event stream. You keep the platform features; you add the ceilings.

Pattern 1: Workflow Back-Edge Cycles via LLM Decision Nodes

Coze's visual workflow builder lets you connect nodes — LLM nodes, code nodes, plugin nodes, condition nodes — into a directed graph. Condition nodes route execution based on a variable value: "if the output contains 'retry', go back to node 3." LLM nodes make routing decisions by prompting the model to choose a next step. The combination of an LLM node followed by a condition node that routes back to an earlier part of the graph creates a back-edge in the workflow DAG — a potential cycle.

Coze workflows have a configurable maximum iteration count, but the default is permissive (often 50–100 iterations). A workflow designed to "keep refining the response until it meets the quality criteria" can iterate to that limit on every invocation where the model's first several outputs are below the threshold — which, for an underspecified quality rubric, is most invocations. Each iteration involves at minimum one LLM node call: the iteration's working context (prior attempts, original instructions, quality feedback) grows with every pass.

Workflow cycle cost accumulation (per invocation):
Iteration 1: task description + instructions = ~2,000 input tokens
Iteration 2: iteration 1 context + first attempt + quality feedback = ~4,200 input tokens
Iteration 5: accumulated context = ~10,800 input tokens
Iteration 10: accumulated context = ~21,600 input tokens
Iteration 20: accumulated context = ~43,000 input tokens

At gpt-4o ($2.50/$10 per 1M tokens), 20-iteration workflow:
Input: 43,000 × $2.50 / 1M = $0.108 per invocation
Output: ~8,000 tokens total × $10 / 1M = $0.080 per invocation
Total: ~$0.19 per stuck workflow invocation
At 500 automated invocations/day hitting the 20-iteration ceiling: $95/day ($2,850/month)

The failure mode amplifies in automated contexts. A Coze bot triggered by a webhook (Slack message, form submission, scheduled cron) with a refinement workflow runs unattended. There is no human to notice that iteration 8 is semantically identical to iteration 5 — the model is stuck, the context is growing, and the bill is accumulating in the background until the maximum iteration count is hit or the timeout fires.

The guard: WorkflowCycleGuard

When running a Coze workflow in streaming mode (stream=true on POST /v1/workflow/run), the event stream includes an execute_id and iteration metadata per node execution event. The guard intercepts each iteration event, hashes a fingerprint of the LLM node's output, and detects repetition — the same output across two consecutive iterations means the workflow is spinning without progress:

Python
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class WorkflowCycleGuard:
    max_iterations: int = 10
    max_input_tokens: int = 30_000
    repeat_threshold: int = 2   # same LLM output fingerprint N times in a row = stuck
    _iteration: int = field(default=0, init=False)
    _total_input_tokens: int = field(default=0, init=False)
    _last_fingerprints: list = field(default_factory=list, init=False)
    _consecutive_repeats: int = field(default=0, init=False)
    _trip_reason: Optional[str] = field(default=None, init=False)

    @property
    def tripped(self) -> bool:
        return self._trip_reason is not None

    def _fingerprint(self, llm_output: str) -> str:
        # Normalize whitespace; hash the semantic content
        normalized = " ".join(llm_output.split())
        return hashlib.sha256(normalized.encode()).hexdigest()[:16]

    def record_iteration(self, llm_output: str, input_tokens: int = 0) -> None:
        if self.tripped:
            return

        self._iteration += 1
        self._total_input_tokens += input_tokens
        fp = self._fingerprint(llm_output)

        if self._last_fingerprints and fp == self._last_fingerprints[-1]:
            self._consecutive_repeats += 1
        else:
            self._consecutive_repeats = 0
        self._last_fingerprints.append(fp)
        if len(self._last_fingerprints) > 5:
            self._last_fingerprints.pop(0)

        if self._iteration >= self.max_iterations:
            self._trip_reason = (
                f"max_iterations={self.max_iterations} reached "
                f"({self._total_input_tokens} cumulative input tokens)"
            )
        elif self._total_input_tokens >= self.max_input_tokens:
            self._trip_reason = (
                f"input token budget exhausted: "
                f"{self._total_input_tokens} >= {self.max_input_tokens}"
            )
        elif self._consecutive_repeats >= self.repeat_threshold:
            self._trip_reason = (
                f"stuck loop detected: same output fingerprint {fp} "
                f"repeated {self._consecutive_repeats + 1} consecutive iterations"
            )

    def status(self) -> dict:
        return {
            "iteration": self._iteration,
            "input_tokens": self._total_input_tokens,
            "consecutive_repeats": self._consecutive_repeats,
            "tripped": self.tripped,
            "trip_reason": self._trip_reason,
        }


def run_workflow_with_guard(
    coze_client,
    workflow_id: str,
    parameters: dict,
    guard: WorkflowCycleGuard,
) -> dict:
    """
    Runs a Coze workflow in streaming mode, intercepting each node execution
    event and enforcing the cycle guard on LLM node outputs.
    """
    import httpx

    url = "https://api.coze.com/v1/workflow/run"
    headers = {
        "Authorization": f"Bearer {coze_client.api_token}",
        "Content-Type": "application/json",
    }
    payload = {
        "workflow_id": workflow_id,
        "parameters": parameters,
        "stream": True,
    }

    iteration_outputs = []

    with httpx.stream("POST", url, headers=headers, json=payload, timeout=120) as r:
        r.raise_for_status()
        for line in r.iter_lines():
            if not line.startswith("data: "):
                continue
            event = json.loads(line[6:])
            event_type = event.get("event")

            if event_type == "Message" and event.get("node_type") == "LLM":
                llm_output = event.get("content", "")
                input_tokens = event.get("usage", {}).get("input_tokens", 0)
                guard.record_iteration(llm_output, input_tokens)
                iteration_outputs.append(llm_output)

                if guard.tripped:
                    # Signal the workflow to abort via the Coze cancel endpoint
                    coze_client.cancel_workflow(
                        execute_id=event.get("execute_id"),
                        workflow_id=workflow_id,
                    )
                    break

            elif event_type == "Done":
                break

    return {
        "iteration_outputs": iteration_outputs,
        "guard_status": guard.status(),
        "completed": not guard.tripped,
    }

The guard enforces three independent ceilings — iteration count, cumulative input tokens, and consecutive-repeat detection — so it trips on whichever threshold is hit first. The repeat detector catches the stuck-loop case even well before the iteration ceiling: if the model has settled into producing the same output, there is no reason to continue iterating regardless of how many iterations remain in the budget.

Pattern 2: Plugin Call Retry Amplification

Coze's plugin marketplace provides 100+ integrations: web search, weather, code execution, image generation, database queries, and custom API connectors. When a workflow node calls a plugin, the plugin call goes through Coze's execution layer and may itself involve an LLM call for synthesis (many "search + summarize" plugins do one search API call and one LLM call to produce the summary). Workflows commonly call 3–6 plugins per pass.

The retry amplification pattern emerges when a condition node detects a failed or low-quality plugin result and routes to a fallback sequence: if the primary search plugin returns empty results, try the secondary search plugin; if that also fails, try a direct web scrape via a code node, then re-run the synthesis LLM node on the scraped content. Each fallback step is a separate LLM call. A 3-plugin primary + 3-plugin fallback sequence that fires the fallback path on 40% of invocations has an effective call-count distribution that's 1.8× the "happy path" estimate:

Plugin retry cost formula:
Primary path: 3 plugin calls × 1 LLM synthesis each = 3 LLM calls
Fallback path (40% of invocations): 3 additional plugin calls = 3 additional LLM calls
Effective calls per invocation: 3 + (0.4 × 3) = 4.2 calls
Per-call average: ~1,800 input tokens (search results + synthesis instructions), ~600 output tokens

At gpt-4o ($2.50/$10 per 1M tokens), per invocation:
Input: 4.2 × 1,800 × $2.50 / 1M = $0.019
Output: 4.2 × 600 × $10 / 1M = $0.025
Total: $0.044 per invocation (vs. $0.024 happy-path estimate)
At 10,000 invocations/day: $440/day vs. $240/day estimated — $200 daily overrun

The gap widens on days when upstream APIs are degraded. A provider outage on the primary search plugin can push the fallback rate from 40% to 95% for hours, tripling the effective LLM call count without any change to your workflow configuration. A production bot that looked like a $240/day line item becomes a $700/day charge during an API incident.

The guard: PluginBudgetGuard

Coze's event stream emits a separate event per plugin execution with a plugin_id and the associated token usage. The guard tracks total plugin calls, per-plugin call counts, and cumulative spend across the current workflow invocation. It enforces ceilings on total plugin calls and total cost before allowing each additional plugin execution to proceed:

Python
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict

# Per-provider cost table (USD per 1M tokens, update as pricing changes)
MODEL_COSTS = {
    "gpt-4o":             {"input": 2.50,  "output": 10.00},
    "gpt-4o-mini":        {"input": 0.15,  "output": 0.60},
    "claude-3-5-sonnet":  {"input": 3.00,  "output": 15.00},
    "doubao-pro-32k":     {"input": 0.80,  "output": 2.40},
}

@dataclass
class PluginBudgetGuard:
    max_plugin_calls: int = 8           # total across all plugins in this invocation
    max_cost_usd: float = 0.10          # total LLM spend for this invocation
    max_calls_per_plugin: int = 3       # per-plugin ceiling (catches single-plugin loops)
    model: str = "gpt-4o"
    _total_calls: int = field(default=0, init=False)
    _calls_by_plugin: dict = field(default_factory=lambda: defaultdict(int), init=False)
    _total_cost_usd: float = field(default=0.0, init=False)
    _trip_reason: Optional[str] = field(default=None, init=False)

    @property
    def tripped(self) -> bool:
        return self._trip_reason is not None

    def _call_cost(self, input_tokens: int, output_tokens: int) -> float:
        costs = MODEL_COSTS.get(self.model, MODEL_COSTS["gpt-4o"])
        return (
            input_tokens / 1_000_000 * costs["input"]
            + output_tokens / 1_000_000 * costs["output"]
        )

    def check_and_record(
        self,
        plugin_id: str,
        input_tokens: int = 0,
        output_tokens: int = 0,
    ) -> dict:
        """
        Call BEFORE allowing a plugin execution to proceed.
        Returns {"approved": True} or {"approved": False, "reason": str}.
        """
        if self.tripped:
            return {"approved": False, "reason": self._trip_reason}

        projected_cost = self._call_cost(input_tokens, output_tokens)
        projected_total = self._total_cost_usd + projected_cost
        projected_plugin_calls = self._calls_by_plugin[plugin_id] + 1

        if self._total_calls + 1 > self.max_plugin_calls:
            self._trip_reason = (
                f"total plugin call ceiling reached: "
                f"{self._total_calls} >= {self.max_plugin_calls}"
            )
            return {"approved": False, "reason": self._trip_reason}

        if projected_total > self.max_cost_usd:
            self._trip_reason = (
                f"invocation cost ceiling: projected ${projected_total:.4f} "
                f"> limit ${self.max_cost_usd}"
            )
            return {"approved": False, "reason": self._trip_reason}

        if projected_plugin_calls > self.max_calls_per_plugin:
            self._trip_reason = (
                f"per-plugin ceiling for '{plugin_id}': "
                f"{projected_plugin_calls} > {self.max_calls_per_plugin}"
            )
            return {"approved": False, "reason": self._trip_reason}

        # Approved — record the call
        self._total_calls += 1
        self._calls_by_plugin[plugin_id] += 1
        self._total_cost_usd += projected_cost
        return {"approved": True}

    def status(self) -> dict:
        return {
            "total_calls": self._total_calls,
            "calls_by_plugin": dict(self._calls_by_plugin),
            "total_cost_usd": round(self._total_cost_usd, 6),
            "tripped": self.tripped,
            "trip_reason": self._trip_reason,
        }

The guard's check_and_record is called before each plugin execution. The input_tokens and output_tokens parameters use the values from the previous call to the same plugin as a projection, or a configured default per plugin type for the first call. This allows the guard to gate the call before cost is incurred, not just record it after. For the first call to a plugin, use per-plugin token defaults from your workflow's historical usage data.

Pattern 3: Long-Term Memory Retrieval Expansion

Coze's long-term memory feature persists conversation excerpts as searchable vector embeddings. When a user sends a new message, the bot retrieves the K most semantically relevant past memories and injects them into the current turn's context before calling the model. The feature is designed to make bots more coherent across sessions — a customer support bot remembers past tickets; a personal assistant remembers user preferences.

The cost amplification is structural: as a bot accumulates conversations, its memory store grows. Retrieval returns more tokens per turn because there are more relevant memories to surface. The injection happens before the model sees the current conversation, so it adds directly to the input token count of every turn. A bot that starts at 500 input tokens per turn (system prompt + user message) can reach 8,000+ input tokens per turn after 500 conversations — a 16× multiplier on a cost that is fully invisible to the caller's code.

Memory retrieval token growth (per conversation turn):
Day 1 (0 memories): base context = 500 tokens input
Day 30 (200 memories, K=5 retrieved): +2,500 tokens → 3,000 tokens input
Day 90 (800 memories, K=8 retrieved): +5,600 tokens → 6,100 tokens input
Day 180 (2,000 memories, K=10 retrieved): +8,000 tokens → 8,500 tokens input

A bot with 1,000 daily active users at 10 turns/day, day 90:
Input: 1,000 × 10 × 6,100 × $2.50 / 1M = $152.50/day
Day 1 baseline: 1,000 × 10 × 500 × $2.50 / 1M = $12.50/day
12× cost growth from memory expansion alone over 90 days

Teams building on Coze often notice this pattern only at month three when the AWS or provider bill spikes unexpectedly. The bot's behavior hasn't changed — it's responding just as well as it did in month one — but the per-turn cost has grown silently in the background as memory accumulated.

The guard: MemoryRetrievalBudget

Coze's chat API response includes usage fields with input and output token counts. The guard tracks the per-turn input token growth curve, computes the memory-injected portion by subtracting the known base context size, and enforces a ceiling on the memory injection per turn. When the ceiling is approached, it signals the bot to request a pruned memory retrieval (fewer K, or filtered to recent memories only) rather than blocking the turn entirely:

Python
from dataclasses import dataclass, field
from typing import Optional
import statistics

@dataclass
class MemoryRetrievalBudget:
    base_context_tokens: int = 500     # system prompt + average user message length
    max_memory_tokens_per_turn: int = 4_000
    max_total_input_tokens_per_turn: int = 6_000
    sample_window: int = 10            # rolling window for growth rate tracking
    _turn_input_tokens: list = field(default_factory=list, init=False)
    _trip_reason: Optional[str] = field(default=None, init=False)

    @property
    def tripped(self) -> bool:
        return self._trip_reason is not None

    def record_turn(self, input_tokens: int) -> None:
        self._turn_input_tokens.append(input_tokens)
        memory_tokens = max(0, input_tokens - self.base_context_tokens)

        if input_tokens >= self.max_total_input_tokens_per_turn:
            self._trip_reason = (
                f"total input tokens per turn exceeded: "
                f"{input_tokens} >= {self.max_total_input_tokens_per_turn}"
            )
        elif memory_tokens >= self.max_memory_tokens_per_turn:
            self._trip_reason = (
                f"memory injection ceiling: estimated {memory_tokens} memory tokens "
                f">= {self.max_memory_tokens_per_turn} limit"
            )

    def growth_rate(self) -> Optional[float]:
        window = self._turn_input_tokens[-self.sample_window:]
        if len(window) < 3:
            return None
        # Tokens added per turn on average over the sample window
        diffs = [window[i] - window[i - 1] for i in range(1, len(window))]
        return statistics.mean(diffs)

    def recommended_k(self, current_k: int) -> int:
        """Return a reduced K value that keeps memory tokens under ceiling."""
        if not self._turn_input_tokens:
            return current_k
        last_input = self._turn_input_tokens[-1]
        memory_tokens = max(0, last_input - self.base_context_tokens)
        if memory_tokens <= 0:
            return current_k
        # Estimate tokens per memory entry, suggest K that fits under ceiling
        tokens_per_entry = memory_tokens / max(current_k, 1)
        safe_k = int(self.max_memory_tokens_per_turn / tokens_per_entry)
        return max(1, min(safe_k, current_k))

    def status(self) -> dict:
        return {
            "turns_recorded": len(self._turn_input_tokens),
            "last_input_tokens": self._turn_input_tokens[-1] if self._turn_input_tokens else 0,
            "growth_rate_tokens_per_turn": self.growth_rate(),
            "tripped": self.tripped,
            "trip_reason": self._trip_reason,
        }

The guard's recommended_k method provides a reduced K value to pass to Coze's memory retrieval configuration parameter, allowing the turn to proceed with fewer memories rather than blocking it. This preserves the bot's responsiveness while containing the cost growth curve. The growth rate tracker gives the operator visibility into the trajectory before the ceiling is reached.

Pattern 4: Multi-Agent Team Re-Delegation Loops

Coze's team feature allows a coordinator bot to dispatch tasks to specialist bots. A coordinator receives the user's request, decides which specialist is best suited (via an LLM call), delegates to that specialist, receives the specialist's response, and either returns it to the user or decides to refine it further — potentially delegating again to the same or a different specialist.

The re-delegation loop emerges when a task falls at the boundary of two specialists' competence. The coordinator delegates to Specialist A (coding), A produces a partial answer, the coordinator decides the answer needs business context and delegates to Specialist B (business analysis), B's response is too technical and the coordinator decides to send it back to A for clarification, A adds implementation details the coordinator doesn't understand, and the cycle repeats. Each delegation round involves at minimum two LLM calls: one for the coordinator to decide and one for the specialist to respond. A 5-round re-delegation cycle on a task that should have been a single specialist response costs 10 LLM calls instead of 2.

Re-delegation cycle cost (per task):
Expected: 1 coordinator call + 1 specialist call = 2 LLM calls
5-round cycle: 5 coordinator calls + 5 specialist calls = 10 LLM calls
Per-call average: ~3,000 input tokens (conversation history grows each round), ~800 output

At claude-3-5-sonnet ($3/$15 per 1M tokens), 5-round cycle:
Input (growing): 1,500 + 2,100 + 2,700 + 3,300 + 3,900 = 13,500 tokens
Output: 5 × 800 = 4,000 tokens
Cost: $0.041 (input) + $0.060 (output) = $0.10 per stuck delegation task
Expected 2-call cost: ~$0.025 — 4× overrun per stuck task
A team bot handling 2,000 tasks/day with 15% re-delegation rate: $30/day overrun

The failure mode is hard to detect from the outside. From the user's perspective, the bot is "thinking" — the latency is high but there is eventually an answer. From the cost dashboard, each delegation round appears as a separate model call with no explicit link to the others. The re-delegation pattern only becomes visible when you aggregate calls by conversation ID and count calls-per-task-completion.

The guard: DelegationDepthGuard

The guard tracks delegation events per task (identified by the initial user message ID or a guard-assigned task token), counts specialist-to-specialist handoffs, and detects the ping-pong pattern — the same pair of specialists being delegated to in alternating order across consecutive rounds:

Python
import time
from dataclasses import dataclass, field
from typing import Optional
from collections import deque

@dataclass
class DelegationDepthGuard:
    max_delegations: int = 4          # total specialist calls per task
    max_elapsed_seconds: float = 60.0 # wall-clock ceiling per task
    ping_pong_threshold: int = 2      # same A→B→A→B pattern repeated N times = trip
    _delegations: list = field(default_factory=list, init=False)   # list of specialist IDs in order
    _start_time: float = field(default_factory=time.monotonic, init=False)
    _trip_reason: Optional[str] = field(default=None, init=False)

    @property
    def tripped(self) -> bool:
        return self._trip_reason is not None

    def _detect_ping_pong(self) -> Optional[str]:
        """Detect A→B→A→B alternation in the delegation sequence."""
        d = self._delegations
        if len(d) < 4:
            return None
        # Check last 4 delegations for ping-pong (A B A B pattern)
        a, b, c, e = d[-4], d[-3], d[-2], d[-1]
        if a == c and b == e and a != b:
            return f"ping-pong detected: {a} → {b} → {a} → {b}"
        return None

    def record_delegation(self, specialist_id: str) -> None:
        if self.tripped:
            return

        self._delegations.append(specialist_id)
        elapsed = time.monotonic() - self._start_time

        if len(self._delegations) > self.max_delegations:
            self._trip_reason = (
                f"delegation depth ceiling: {len(self._delegations)} "
                f"> {self.max_delegations} specialist calls"
            )
        elif elapsed > self.max_elapsed_seconds:
            self._trip_reason = (
                f"task elapsed time ceiling: {elapsed:.1f}s "
                f"> {self.max_elapsed_seconds}s"
            )
        else:
            pp = self._detect_ping_pong()
            if pp:
                self._trip_reason = pp

    def status(self) -> dict:
        return {
            "delegations": self._delegations,
            "delegation_count": len(self._delegations),
            "elapsed_seconds": round(time.monotonic() - self._start_time, 2),
            "tripped": self.tripped,
            "trip_reason": self._trip_reason,
        }


def run_team_task_with_guard(
    coordinator_bot,
    task: str,
    guard: DelegationDepthGuard,
) -> dict:
    """
    Drives a Coze multi-agent team task through the coordinator bot,
    enforcing the delegation depth guard on each handoff.
    """
    specialist_responses = []
    current_context = task

    # The coordinator's delegation loop
    while not guard.tripped:
        coordinator_response = coordinator_bot.chat(current_context)
        action = coordinator_response.get("action")

        if action == "respond":
            # Coordinator is answering directly — done
            return {
                "final_response": coordinator_response.get("content"),
                "specialist_responses": specialist_responses,
                "guard_status": guard.status(),
                "completed": True,
            }

        elif action == "delegate":
            specialist_id = coordinator_response.get("specialist_id")
            guard.record_delegation(specialist_id)

            if guard.tripped:
                break

            # Call the specialist
            specialist_bot = coordinator_bot.get_specialist(specialist_id)
            specialist_response = specialist_bot.chat(
                coordinator_response.get("delegated_task", current_context)
            )
            specialist_responses.append({
                "specialist": specialist_id,
                "response": specialist_response.get("content"),
            })
            # Feed specialist output back to coordinator for next decision
            current_context = (
                f"Original task: {task}\n\n"
                f"Specialist {specialist_id} response:\n"
                f"{specialist_response.get('content')}"
            )
        else:
            break

    return {
        "final_response": None,
        "specialist_responses": specialist_responses,
        "guard_status": guard.status(),
        "completed": False,
    }

The ping-pong detector catches the specific failure mode where two specialists are being repeatedly consulted in alternation — a pattern that produces two coordinator calls per cycle rather than just the one that proper task routing would generate. The elapsed-time ceiling provides a backstop for edge cases where the delegation sequence is novel enough to avoid the pattern detector but is still clearly stuck.

When to apply which guard

The four patterns are independent but can co-occur. A workflow that calls plugins and has long-term memory enabled can trigger Pattern 1 (back-edge cycles), Pattern 2 (plugin retries within each cycle iteration), and Pattern 3 (memory retrieval expanding the per-iteration context) simultaneously. In that scenario, the guards compose: run WorkflowCycleGuard on the outer loop, PluginBudgetGuard on each plugin call within a cycle iteration, and MemoryRetrievalBudget on the input token count of each LLM node call.

Pattern Trigger Guard Trip signal
Workflow back-edge cycle Refinement loops, quality-gate retries WorkflowCycleGuard Max iterations, token budget, or repeated LLM output
Plugin retry amplification Fallback chains, API degradation PluginBudgetGuard Total calls, cost ceiling, or per-plugin call count
Memory retrieval expansion High-MAU bots, long-running deployments MemoryRetrievalBudget Memory token injection ceiling per turn
Multi-agent re-delegation Cross-domain tasks, underspecified routing DelegationDepthGuard Delegation depth, elapsed time, or A→B→A→B pattern

Frequently asked questions

Does Coze have a built-in spend ceiling?

Coze does not expose a per-workflow or per-conversation spend ceiling through its API. The platform has a configurable maximum iteration count on workflows (which you can set in the workflow builder), but there is no token budget enforcement, no cost estimate surface, and no automatic trip on cost. You are responsible for instrumenting your own ceilings — either at the Coze API call layer or via the platform's workflow variables if you are using Coze's own execution environment.

Do these guards work when Coze manages the LLM key, or only when using my own API key?

The guards work in both modes, but with different observability. When you use your own API key in Coze, you can correlate guard-tracked token counts with your provider dashboard. When Coze manages the LLM key (using Coze credits or the default Doubao model), you still get the guard-enforced trip on iteration or delegation count, but you cannot independently verify the per-call token cost — you rely on the usage figures Coze returns in the event stream, which are the authoritative source in either case.

Can I use these guards in Coze's visual workflow builder directly?

The Python guards shown here operate at the Coze API caller layer — your code that calls POST /v1/workflow/run or POST /v1/chat. Within the Coze workflow builder itself, you can implement partial versions of these guards using Coze's Code nodes (which support Python and JavaScript) and workflow variables to track iteration counts. The API-layer guards are more reliable for production cost control because they intercept calls before they're sent, not just count them after.

How does the memory retrieval guard know how many tokens the retrieved memories are consuming?

Coze's chat API response includes usage.input_tokens in the response body, which counts the total input to the model — including memory-injected tokens, system prompt, and the user's message. The guard subtracts the known base context size (which you configure as base_context_tokens) to estimate the memory-injected portion. This is an estimate, not an exact figure, because the base context size can vary slightly per turn. For tighter control, you can set base_context_tokens conservatively (use the minimum observed base context size) so the memory estimate is an upper bound.

What is the right ceiling for the DelegationDepthGuard's max_delegations?

For most multi-agent team workflows, 3–5 total specialist calls is a reasonable ceiling. A task that genuinely requires more than 5 specialist consultations is likely either (a) a task that should be split into subtasks at the coordinator level rather than delegated iteratively, or (b) a task that the coordinator's routing logic is ill-suited for. In practice, teams that observe high delegation depth counts should treat it as a signal to improve coordinator routing instructions first, not just raise the ceiling.

RunGuard adds the ceiling Coze doesn't ship with

The guards in this post implement the same patterns as RunGuard's SDK: iteration tracking, token budget enforcement, pattern-based stuck-loop detection, and delegation depth limits. RunGuard wraps your existing Coze API calls — you configure ceilings in one place, and every workflow invocation, plugin call, and agent delegation inherits them automatically.

Start free trial — no card required