Task Decomposition Agent — Python (vollständige Implementierung mit LLM)

Ausführbares Task-Decomposition-Agent-Beispiel im Production-Stil in Python mit Planung, Policy Boundary, sequentieller Schrittausführung, Budgets und Stop-Reasons.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. tools.py — Tools (Quelle der Fakten)
  10. gateway.py — Policy Boundary (wichtigste Schicht)
  11. llm.py — planning + final synthesis
  12. main.py — Plan -> Execute -> Combine
  13. requirements.txt
  14. Beispielausgabe
  15. Typische stop_reason-Werte
  16. Was hier NICHT gezeigt wird
  17. Was als Nächstes probieren
  18. Vollständiger Code auf GitHub

Kern des Musters (Kurz)

Task Decomposition Agent ist ein Muster, bei dem der Agent eine komplexe Aufgabe zuerst in aufeinanderfolgende Schritte zerlegt und sie erst danach nacheinander ausführt.

Das Modell ist für den Plan und die Reihenfolge der Aktionen zuständig, während die Ausführung jedes Schritts über ein kontrolliertes Gateway mit Plan-Validierung, Allowlist und Run-Budgets läuft.


Was dieses Beispiel zeigt

  • separater Plan-Schritt vor Execute
  • Policy Boundary zwischen Planning (LLM) und Tools (Execution Layer)
  • strikte Plan-Validierung (kind, Schrittstruktur, allowed keys)
  • Tool-Allowlist (deny by default)
  • separate Run-Budgets: max_plan_steps (Plan) und max_execute_steps (Ausführung), plus max_tool_calls, max_seconds
  • explizite stop_reason-Werte für Debugging und Monitoring
  • raw_plan in der Antwort, wenn der Plan ungültig ist

Architektur

  1. Das LLM erhält das Goal und liefert einen JSON-Plan zurück (kind="plan", steps).
  2. Die Policy Boundary validiert den Plan und blockiert ungültige/unsichere Formen.
  3. Jeder Schritt wird sequenziell über ToolGateway ausgeführt (Allowlist, Budgets, Loop Detection).
  4. Die Observation jedes Schritts wird als Checkpoint für transparente Ausführung in history gespeichert.
  5. Nach Ausführung aller Schritte erstellt das LLM eine finale Synthese auf Basis von history mit einem separaten Combine-Aufruf ohne Tools.

Das LLM liefert Intent (Plan), der als untrusted Input behandelt wird: Die Policy Boundary validiert zuerst und ruft erst dann (wenn erlaubt) Tools auf. Die Allowlist wird zweimal angewendet: in der Plan-Validierung (invalid_plan:tool_not_allowed:*) und bei der Tool-Ausführung (tool_denied:*).

So bleibt Task Decomposition kontrollierbar: Der Agent plant, und die Ausführung läuft durch eine kontrollierte Schicht.


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── task-decomposition-agent/
        └── python/
            ├── main.py           # Plan -> Execute -> Combine
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: plan validation + tool execution control
            ├── tools.py          # deterministic tools (Anna/Max, US, USD)
            └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Umgebungsvariablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir vor, eine Führungskraft bittet:

"Erstelle einen kurzen Bericht für April 2026: Verkäufe, Rückerstattungen, Nettoumsatz und Risiken."

Der Agent darf die Antwort nicht "aus dem Kopf" erfinden. Er muss:

  • zuerst einen Plan erstellen
  • die Schritte nacheinander ausführen
  • Daten nur aus erlaubten Tools nutzen
  • die finale Antwort erst nach allen Schritten geben

Lösung

Hier arbeitet der Agent in einem einfachen Ablauf:

  • Das LLM erstellt zuerst einen Plan mit mehreren Schritten
  • das System prüft, dass der Plan korrekt und erlaubt ist
  • Tools führen die Schritte aus und liefern Fakten
  • danach erstellt das LLM die finale kurze Zusammenfassung
  • ist ein Plan oder Schritt ungültig, stoppt der Run mit einem Grund

Code

tools.py — Tools (Quelle der Fakten)

PYTHON
from __future__ import annotations

from typing import Any

MANAGERS = {
    42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
    7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}

SALES_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
        {"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
        {"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
        {"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
        {"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
    ]
}

REFUND_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "refunds_usd": 140.0},
        {"day": "2026-04-02", "refunds_usd": 260.0},
        {"day": "2026-04-03", "refunds_usd": 210.0},
        {"day": "2026-04-04", "refunds_usd": 590.0},
        {"day": "2026-04-05", "refunds_usd": 170.0},
    ]
}


def get_manager_profile(manager_id: int) -> dict[str, Any]:
    manager = MANAGERS.get(manager_id)
    if not manager:
        return {"error": f"manager {manager_id} not found"}
    return {"manager": manager}


def fetch_sales_data(month: str) -> dict[str, Any]:
    rows = SALES_DATA.get(month)
    if not rows:
        return {"error": f"sales data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_sales": rows}


def fetch_refund_data(month: str) -> dict[str, Any]:
    rows = REFUND_DATA.get(month)
    if not rows:
        return {"error": f"refund data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_refunds": rows}


def calculate_monthly_kpis(month: str) -> dict[str, Any]:
    sales_rows = SALES_DATA.get(month)
    refund_rows = REFUND_DATA.get(month)
    if not sales_rows or not refund_rows:
        return {"error": f"kpi inputs for {month} not found"}

    gross_sales = sum(row["gross_usd"] for row in sales_rows)
    total_refunds = sum(row["refunds_usd"] for row in refund_rows)
    total_orders = sum(row["orders"] for row in sales_rows)
    net_sales = gross_sales - total_refunds
    refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0

    top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]

    return {
        "month": month,
        "currency": "USD",
        "gross_sales_usd": round(gross_sales, 2),
        "refunds_usd": round(total_refunds, 2),
        "net_sales_usd": round(net_sales, 2),
        "orders": total_orders,
        "refund_rate": round(refund_rate, 4),
        "top_sales_day": top_day,
    }


def detect_risk_signals(month: str) -> dict[str, Any]:
    refund_rows = REFUND_DATA.get(month)
    if not refund_rows:
        return {"error": f"refund data for {month} not found"}

    high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
    warnings: list[str] = []

    if high_refund_day["refunds_usd"] >= 500:
        warnings.append(
            f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
        )

    if not warnings:
        warnings.append("No critical risk signals detected for this month.")

    return {
        "month": month,
        "currency": "USD",
        "risk_warnings": warnings,
        "peak_refund_day": high_refund_day,
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Tools sind deterministisch und enthalten keine LLM-Logik.
  • Der Agent entscheidet nur, welche Schritte ausgeführt werden.
  • Die Business-Logik wird von der Execution-Layer (Tools) ausgeführt, nicht vom LLM.

gateway.py — Policy Boundary (wichtigste Schicht)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_plan_steps: int = 6
    max_execute_steps: int = 8
    max_tool_calls: int = 8
    max_seconds: int = 60


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(item) for item in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(
                json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
            )
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def args_hash(args: dict[str, Any]) -> str:
    raw = _stable_json(args or {})
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def validate_plan_action(
    action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
    if not isinstance(action, dict):
        raise StopRun("invalid_plan:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_plan:non_json")
    if kind != "plan":
        raise StopRun("invalid_plan:bad_kind")

    allowed_top_keys = {"kind", "steps"}
    if set(action.keys()) - allowed_top_keys:
        raise StopRun("invalid_plan:extra_keys")

    steps = action.get("steps")
    if not isinstance(steps, list) or not steps:
        raise StopRun("invalid_plan:missing_steps")
    if len(steps) < 3:
        raise StopRun("invalid_plan:min_steps")
    if len(steps) > max_plan_steps:
        raise StopRun("invalid_plan:max_steps")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()

    for index, step in enumerate(steps, start=1):
        if not isinstance(step, dict):
            raise StopRun(f"invalid_plan:step_{index}_not_object")

        allowed_step_keys = {"id", "title", "tool", "args"}
        if set(step.keys()) - allowed_step_keys:
            raise StopRun(f"invalid_plan:step_{index}_extra_keys")

        step_id = step.get("id")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_id")
        if step_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_step_id")
        seen_ids.add(step_id)

        title = step.get("title")
        if not isinstance(title, str) or not title.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_title")

        tool = step.get("tool")
        if not isinstance(tool, str) or not tool.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_tool")
        tool = tool.strip()
        if tool not in allowed_tools:
            raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")

        args = step.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun(f"invalid_plan:step_{index}_bad_args")

        normalized.append(
            {
                "id": step_id.strip(),
                "title": title.strip(),
                "tool": tool,
                "args": args,
            }
        )

    return normalized


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_calls: set[str] = set()

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        tool = self.registry.get(name)
        if tool is None:
            raise StopRun(f"tool_missing:{name}")

        signature = f"{name}:{args_hash(args)}"
        if signature in self.seen_calls:
            raise StopRun("loop_detected")
        self.seen_calls.add(signature)

        try:
            return tool(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

Was hier am wichtigsten ist (einfach erklärt)

  • validate_plan_action(...) ist die Governance/Control Layer für den LLM-Plan.
  • Der Plan wird als untrusted Input behandelt und durchläuft eine strikte Validierung.
  • ToolGateway.call(...) ist die Grenze agent ≠ executor: Der Agent plant, das Gateway führt sicher aus.
  • loop_detected erkennt exact-repeat (tool + args_hash).

llm.py — planning + final synthesis

Das LLM sieht nur den Katalog verfügbarer Tools; ist ein Tool nicht in der Allowlist, stoppt das Gateway den Run.

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "steps": [
    {"id": "step_1", "title": "...", "tool": "...", "args": {...}}
  ]
}

Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_manager_profile",
        "description": "Get manager profile by manager_id",
        "args": {"manager_id": "integer"},
    },
    {
        "name": "fetch_sales_data",
        "description": "Get daily gross sales for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "fetch_refund_data",
        "description": "Get daily refund values for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "calculate_monthly_kpis",
        "description": "Calculate gross/refunds/net/order KPIs for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "detect_risk_signals",
        "description": "Detect risk warnings for a month",
        "args": {"month": "string in YYYY-MM"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "max_plan_steps": max_plan_steps,
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Was hier am wichtigsten ist (einfach erklärt)

  • create_plan(...) ist die Decision-Stage für Decomposition.
  • timeout=LLM_TIMEOUT_SECONDS + LLMTimeout sorgen für kontrollierten Stopp bei Netzwerk-/Modellproblemen.
  • Eine leere finale Antwort wird nicht mit Fallback-Text kaschiert: Es wird explizit llm_empty zurückgegeben.
  • Wenn JSON ungültig ist, wird {"kind":"invalid"...} zurückgegeben, und die Policy Layer liefert ein lesbares stop_reason.

main.py — Plan -> Execute -> Combine

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
    calculate_monthly_kpis,
    detect_risk_signals,
    fetch_refund_data,
    fetch_sales_data,
    get_manager_profile,
)

GOAL = (
    "Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
    "Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)

# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)

TOOL_REGISTRY = {
    "get_manager_profile": get_manager_profile,
    "fetch_sales_data": fetch_sales_data,
    "fetch_refund_data": fetch_refund_data,
    "calculate_monthly_kpis": calculate_monthly_kpis,
    "detect_risk_signals": detect_risk_signals,
}

ALLOWED_TOOLS = {
    "get_manager_profile",
    "fetch_sales_data",
    "fetch_refund_data",
    "calculate_monthly_kpis",
    "detect_risk_signals",
}


def run_task_decomposition(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    try:
        raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        steps = validate_plan_action(
            raw_plan,
            max_plan_steps=BUDGET.max_plan_steps,
            allowed_tools=ALLOWED_TOOLS,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if len(steps) > BUDGET.max_execute_steps:
        return {
            "status": "stopped",
            "stop_reason": "max_execute_steps",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    for step_no, step in enumerate(steps, start=1):
        elapsed = time.monotonic() - started
        if elapsed > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        tool_name = step["tool"]
        tool_args = step["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": True,
                }
            )
        except StopRun as exc:
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": False,
                    "stop_reason": exc.reason,
                }
            )
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        history.append(
            {
                "step_no": step_no,
                "plan_step": step,
                "observation": observation,
            }
        )

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": steps,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_task_decomposition(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • run_task_decomposition(...) steuert Plan -> Execute -> Combine; Business-Aktionen laufen ausschließlich über ToolGateway.
  • Bei einem ungültigen Plan wird raw_plan fürs Debugging zurückgegeben.
  • In dieser Version prüft max_execute_steps die Planlänge vor der Execution; Runtime-Limits werden dann durch max_tool_calls und max_seconds gesetzt.
  • history ist ein transparenter Schritt-Log: was im Plan stand und welche Observation jedes Tool zurückgab.

requirements.txt

TEXT
openai==2.21.0

Beispielausgabe

Die Reihenfolge der Plan-Schritte kann zwischen Runs leicht variieren, aber Policy-Gates und Stop-Reasons bleiben stabil. Der Planner kann die Reihenfolge ändern; wichtig ist, dass Policy + Allowlist unabhängig von der Reihenfolge gleich funktionieren.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
  "plan": [
    {"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
    {"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
    {"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
    {"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
    {"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
  ],
  "trace": [
    {"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
    {"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
    {"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
    {"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
    {"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
  ],
  "history": [{...}]
}

Dies ist ein verkürztes Beispiel: In einem realen Run können plan und trace mehr Schritte enthalten.

history ist das Ausführungsprotokoll: Für jeden step_no gibt es plan_step und observation.

args_hash hasht nur die Argumente, daher kann er sich zwischen verschiedenen Tools gleichen, wenn args identisch sind; Loop Detection berücksichtigt zusätzlich den Tool-Namen.


Typische stop_reason-Werte

  • success — Plan wurde ausgeführt und finale Antwort generiert
  • invalid_plan:* — Plan vom LLM hat die Policy-Validierung nicht bestanden
  • invalid_plan:non_json — LLM hat keinen validen JSON-Plan zurückgegeben
  • invalid_plan:min_steps — der Plan hat weniger als 3 Decomposition-Schritte
  • invalid_plan:tool_not_allowed:<name> — Plan enthält ein Tool außerhalb der Allowlist
  • max_execute_steps — Plan ist länger als das erlaubte Execution-Budget
  • max_tool_calls — Tool-Call-Limit ausgeschöpft
  • max_seconds — Time-Budget des Runs überschritten
  • llm_timeout — LLM hat nicht innerhalb von OPENAI_TIMEOUT_SECONDS geantwortet
  • llm_empty — LLM hat in der finalize-Phase eine leere finale Antwort zurückgegeben
  • tool_denied:<name> — Tool ist nicht in der Allowlist
  • tool_missing:<name> — Tool fehlt im Registry
  • tool_bad_args:<name> — Schritt enthält ungültige Argumente
  • loop_detected — exact repeat (tool + args_hash)

Was hier NICHT gezeigt wird

  • Keine Auth/PII- und Production-Zugriffskontrollen für personenbezogene Daten.
  • Keine Retry/Backoff-Strategien für LLM und Tool-Layer.
  • Keine Token-/Kostenbudgets (cost guardrails).
  • Die Tools hier sind deterministische Lern-Mocks und keine realen externen APIs.

Was als Nächstes probieren

  • Entferne detect_risk_signals aus ALLOWED_TOOLS und prüfe tool_denied:*.
  • Füge dem Plan ein nicht existentes Tool hinzu und prüfe tool_missing:*.
  • Reduziere max_plan_steps auf 3 und beobachte, wie oft invalid_plan:max_steps auftritt.
  • Ändere GOAL zu manager_id=7 (Max) und vergleiche die finale Synthese.
  • Füge Cost/Token-Guardrails in Budget und im finalen JSON-Ergebnis hinzu.

Vollständiger Code auf GitHub

Im Repository liegt die vollständige runnable-Version dieses Beispiels: Planning, Policy Boundary, sequenzielle Schrittausführung und Stop-Reasons.

Vollständigen Code auf GitHub ansehen ↗
⏱️ 14 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.