Kern des Musters (Kurz)
Task Decomposition Agent ist ein Muster, bei dem der Agent eine komplexe Aufgabe zuerst in aufeinanderfolgende Schritte zerlegt und sie erst danach nacheinander ausführt.
Das Modell ist für den Plan und die Reihenfolge der Aktionen zuständig, während die Ausführung jedes Schritts über ein kontrolliertes Gateway mit Plan-Validierung, Allowlist und Run-Budgets läuft.
Was dieses Beispiel zeigt
- separater
Plan-Schritt vorExecute - Policy Boundary zwischen Planning (LLM) und Tools (Execution Layer)
- strikte Plan-Validierung (
kind, Schrittstruktur, allowed keys) - Tool-Allowlist (deny by default)
- separate Run-Budgets:
max_plan_steps(Plan) undmax_execute_steps(Ausführung), plusmax_tool_calls,max_seconds - explizite
stop_reason-Werte für Debugging und Monitoring raw_planin der Antwort, wenn der Plan ungültig ist
Architektur
- Das LLM erhält das Goal und liefert einen JSON-Plan zurück (
kind="plan",steps). - Die Policy Boundary validiert den Plan und blockiert ungültige/unsichere Formen.
- Jeder Schritt wird sequenziell über
ToolGatewayausgeführt (Allowlist, Budgets, Loop Detection). - Die Observation jedes Schritts wird als Checkpoint für transparente Ausführung in
historygespeichert. - Nach Ausführung aller Schritte erstellt das LLM eine finale Synthese auf Basis von
historymit einem separatenCombine-Aufruf ohne Tools.
Das LLM liefert Intent (Plan), der als untrusted Input behandelt wird: Die Policy Boundary validiert zuerst und ruft erst dann (wenn erlaubt) Tools auf.
Die Allowlist wird zweimal angewendet: in der Plan-Validierung (invalid_plan:tool_not_allowed:*) und bei der Tool-Ausführung (tool_denied:*).
So bleibt Task Decomposition kontrollierbar: Der Agent plant, und die Ausführung läuft durch eine kontrollierte Schicht.
Projektstruktur
examples/
└── agent-patterns/
└── task-decomposition-agent/
└── python/
├── main.py # Plan -> Execute -> Combine
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: plan validation + tool execution control
├── tools.py # deterministic tools (Anna/Max, US, USD)
└── requirements.txt
Ausführen
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ ist erforderlich.
Variante über export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Variante über .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Umgebungsvariablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.
Aufgabe
Stell dir vor, eine Führungskraft bittet:
"Erstelle einen kurzen Bericht für April 2026: Verkäufe, Rückerstattungen, Nettoumsatz und Risiken."
Der Agent darf die Antwort nicht "aus dem Kopf" erfinden. Er muss:
- zuerst einen Plan erstellen
- die Schritte nacheinander ausführen
- Daten nur aus erlaubten Tools nutzen
- die finale Antwort erst nach allen Schritten geben
Lösung
Hier arbeitet der Agent in einem einfachen Ablauf:
- Das LLM erstellt zuerst einen Plan mit mehreren Schritten
- das System prüft, dass der Plan korrekt und erlaubt ist
- Tools führen die Schritte aus und liefern Fakten
- danach erstellt das LLM die finale kurze Zusammenfassung
- ist ein Plan oder Schritt ungültig, stoppt der Run mit einem Grund
Code
tools.py — Tools (Quelle der Fakten)
from __future__ import annotations
from typing import Any
MANAGERS = {
42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}
SALES_DATA = {
"2026-04": [
{"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
{"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
{"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
{"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
{"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
]
}
REFUND_DATA = {
"2026-04": [
{"day": "2026-04-01", "refunds_usd": 140.0},
{"day": "2026-04-02", "refunds_usd": 260.0},
{"day": "2026-04-03", "refunds_usd": 210.0},
{"day": "2026-04-04", "refunds_usd": 590.0},
{"day": "2026-04-05", "refunds_usd": 170.0},
]
}
def get_manager_profile(manager_id: int) -> dict[str, Any]:
manager = MANAGERS.get(manager_id)
if not manager:
return {"error": f"manager {manager_id} not found"}
return {"manager": manager}
def fetch_sales_data(month: str) -> dict[str, Any]:
rows = SALES_DATA.get(month)
if not rows:
return {"error": f"sales data for {month} not found"}
return {"month": month, "currency": "USD", "daily_sales": rows}
def fetch_refund_data(month: str) -> dict[str, Any]:
rows = REFUND_DATA.get(month)
if not rows:
return {"error": f"refund data for {month} not found"}
return {"month": month, "currency": "USD", "daily_refunds": rows}
def calculate_monthly_kpis(month: str) -> dict[str, Any]:
sales_rows = SALES_DATA.get(month)
refund_rows = REFUND_DATA.get(month)
if not sales_rows or not refund_rows:
return {"error": f"kpi inputs for {month} not found"}
gross_sales = sum(row["gross_usd"] for row in sales_rows)
total_refunds = sum(row["refunds_usd"] for row in refund_rows)
total_orders = sum(row["orders"] for row in sales_rows)
net_sales = gross_sales - total_refunds
refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0
top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]
return {
"month": month,
"currency": "USD",
"gross_sales_usd": round(gross_sales, 2),
"refunds_usd": round(total_refunds, 2),
"net_sales_usd": round(net_sales, 2),
"orders": total_orders,
"refund_rate": round(refund_rate, 4),
"top_sales_day": top_day,
}
def detect_risk_signals(month: str) -> dict[str, Any]:
refund_rows = REFUND_DATA.get(month)
if not refund_rows:
return {"error": f"refund data for {month} not found"}
high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
warnings: list[str] = []
if high_refund_day["refunds_usd"] >= 500:
warnings.append(
f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
)
if not warnings:
warnings.append("No critical risk signals detected for this month.")
return {
"month": month,
"currency": "USD",
"risk_warnings": warnings,
"peak_refund_day": high_refund_day,
}
Was hier am wichtigsten ist (einfach erklärt)
- Tools sind deterministisch und enthalten keine LLM-Logik.
- Der Agent entscheidet nur, welche Schritte ausgeführt werden.
- Die Business-Logik wird von der Execution-Layer (Tools) ausgeführt, nicht vom LLM.
gateway.py — Policy Boundary (wichtigste Schicht)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_plan_steps: int = 6
max_execute_steps: int = 8
max_tool_calls: int = 8
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def args_hash(args: dict[str, Any]) -> str:
raw = _stable_json(args or {})
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_plan_action(
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
if not isinstance(action, dict):
raise StopRun("invalid_plan:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_plan:non_json")
if kind != "plan":
raise StopRun("invalid_plan:bad_kind")
allowed_top_keys = {"kind", "steps"}
if set(action.keys()) - allowed_top_keys:
raise StopRun("invalid_plan:extra_keys")
steps = action.get("steps")
if not isinstance(steps, list) or not steps:
raise StopRun("invalid_plan:missing_steps")
if len(steps) < 3:
raise StopRun("invalid_plan:min_steps")
if len(steps) > max_plan_steps:
raise StopRun("invalid_plan:max_steps")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for index, step in enumerate(steps, start=1):
if not isinstance(step, dict):
raise StopRun(f"invalid_plan:step_{index}_not_object")
allowed_step_keys = {"id", "title", "tool", "args"}
if set(step.keys()) - allowed_step_keys:
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
step_id = step.get("id")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_id")
if step_id in seen_ids:
raise StopRun("invalid_plan:duplicate_step_id")
seen_ids.add(step_id)
title = step.get("title")
if not isinstance(title, str) or not title.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_title")
tool = step.get("tool")
if not isinstance(tool, str) or not tool.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
tool = tool.strip()
if tool not in allowed_tools:
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
args = step.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun(f"invalid_plan:step_{index}_bad_args")
normalized.append(
{
"id": step_id.strip(),
"title": title.strip(),
"tool": tool,
"args": args,
}
)
return normalized
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_calls: set[str] = set()
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
tool = self.registry.get(name)
if tool is None:
raise StopRun(f"tool_missing:{name}")
signature = f"{name}:{args_hash(args)}"
if signature in self.seen_calls:
raise StopRun("loop_detected")
self.seen_calls.add(signature)
try:
return tool(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
Was hier am wichtigsten ist (einfach erklärt)
validate_plan_action(...)ist die Governance/Control Layer für den LLM-Plan.- Der Plan wird als untrusted Input behandelt und durchläuft eine strikte Validierung.
ToolGateway.call(...)ist die Grenzeagent ≠ executor: Der Agent plant, das Gateway führt sicher aus.loop_detectederkennt exact-repeat (tool + args_hash).
llm.py — planning + final synthesis
Das LLM sieht nur den Katalog verfügbarer Tools; ist ein Tool nicht in der Allowlist, stoppt das Gateway den Run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"steps": [
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
]
}
Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()
TOOL_CATALOG = [
{
"name": "get_manager_profile",
"description": "Get manager profile by manager_id",
"args": {"manager_id": "integer"},
},
{
"name": "fetch_sales_data",
"description": "Get daily gross sales for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "fetch_refund_data",
"description": "Get daily refund values for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "calculate_monthly_kpis",
"description": "Calculate gross/refunds/net/order KPIs for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "detect_risk_signals",
"description": "Detect risk warnings for a month",
"args": {"month": "string in YYYY-MM"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
payload = {
"goal": goal,
"max_plan_steps": max_plan_steps,
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Was hier am wichtigsten ist (einfach erklärt)
create_plan(...)ist die Decision-Stage für Decomposition.timeout=LLM_TIMEOUT_SECONDS+LLMTimeoutsorgen für kontrollierten Stopp bei Netzwerk-/Modellproblemen.- Eine leere finale Antwort wird nicht mit Fallback-Text kaschiert: Es wird explizit
llm_emptyzurückgegeben. - Wenn JSON ungültig ist, wird
{"kind":"invalid"...}zurückgegeben, und die Policy Layer liefert ein lesbaresstop_reason.
main.py — Plan -> Execute -> Combine
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
calculate_monthly_kpis,
detect_risk_signals,
fetch_refund_data,
fetch_sales_data,
get_manager_profile,
)
GOAL = (
"Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
"Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)
# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)
TOOL_REGISTRY = {
"get_manager_profile": get_manager_profile,
"fetch_sales_data": fetch_sales_data,
"fetch_refund_data": fetch_refund_data,
"calculate_monthly_kpis": calculate_monthly_kpis,
"detect_risk_signals": detect_risk_signals,
}
ALLOWED_TOOLS = {
"get_manager_profile",
"fetch_sales_data",
"fetch_refund_data",
"calculate_monthly_kpis",
"detect_risk_signals",
}
def run_task_decomposition(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
try:
raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "plan",
"trace": trace,
"history": history,
}
try:
steps = validate_plan_action(
raw_plan,
max_plan_steps=BUDGET.max_plan_steps,
allowed_tools=ALLOWED_TOOLS,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if len(steps) > BUDGET.max_execute_steps:
return {
"status": "stopped",
"stop_reason": "max_execute_steps",
"plan": steps,
"trace": trace,
"history": history,
}
for step_no, step in enumerate(steps, start=1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"plan": steps,
"trace": trace,
"history": history,
}
tool_name = step["tool"]
tool_args = step["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"plan": steps,
"trace": trace,
"history": history,
}
history.append(
{
"step_no": step_no,
"plan_step": step,
"observation": observation,
}
)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": steps,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_task_decomposition(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Was hier am wichtigsten ist (einfach erklärt)
run_task_decomposition(...)steuertPlan -> Execute -> Combine; Business-Aktionen laufen ausschließlich überToolGateway.- Bei einem ungültigen Plan wird
raw_planfürs Debugging zurückgegeben. - In dieser Version prüft
max_execute_stepsdie Planlänge vor der Execution; Runtime-Limits werden dann durchmax_tool_callsundmax_secondsgesetzt. historyist ein transparenter Schritt-Log: was im Plan stand und welche Observation jedes Tool zurückgab.
requirements.txt
openai==2.21.0
Beispielausgabe
Die Reihenfolge der Plan-Schritte kann zwischen Runs leicht variieren, aber Policy-Gates und Stop-Reasons bleiben stabil. Der Planner kann die Reihenfolge ändern; wichtig ist, dass Policy + Allowlist unabhängig von der Reihenfolge gleich funktionieren.
{
"status": "ok",
"stop_reason": "success",
"answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
"plan": [
{"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
{"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
{"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
{"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
{"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
],
"trace": [
{"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
{"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
{"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
{"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
{"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
],
"history": [{...}]
}
Dies ist ein verkürztes Beispiel: In einem realen Run können plan und trace mehr Schritte enthalten.
history ist das Ausführungsprotokoll: Für jeden step_no gibt es plan_step und observation.
args_hash hasht nur die Argumente, daher kann er sich zwischen verschiedenen Tools gleichen, wenn args identisch sind; Loop Detection berücksichtigt zusätzlich den Tool-Namen.
Typische stop_reason-Werte
success— Plan wurde ausgeführt und finale Antwort generiertinvalid_plan:*— Plan vom LLM hat die Policy-Validierung nicht bestandeninvalid_plan:non_json— LLM hat keinen validen JSON-Plan zurückgegebeninvalid_plan:min_steps— der Plan hat weniger als 3 Decomposition-Schritteinvalid_plan:tool_not_allowed:<name>— Plan enthält ein Tool außerhalb der Allowlistmax_execute_steps— Plan ist länger als das erlaubte Execution-Budgetmax_tool_calls— Tool-Call-Limit ausgeschöpftmax_seconds— Time-Budget des Runs überschrittenllm_timeout— LLM hat nicht innerhalb vonOPENAI_TIMEOUT_SECONDSgeantwortetllm_empty— LLM hat in derfinalize-Phase eine leere finale Antwort zurückgegebentool_denied:<name>— Tool ist nicht in der Allowlisttool_missing:<name>— Tool fehlt im Registrytool_bad_args:<name>— Schritt enthält ungültige Argumenteloop_detected— exact repeat (tool + args_hash)
Was hier NICHT gezeigt wird
- Keine Auth/PII- und Production-Zugriffskontrollen für personenbezogene Daten.
- Keine Retry/Backoff-Strategien für LLM und Tool-Layer.
- Keine Token-/Kostenbudgets (cost guardrails).
- Die Tools hier sind deterministische Lern-Mocks und keine realen externen APIs.
Was als Nächstes probieren
- Entferne
detect_risk_signalsausALLOWED_TOOLSund prüfetool_denied:*. - Füge dem Plan ein nicht existentes Tool hinzu und prüfe
tool_missing:*. - Reduziere
max_plan_stepsauf3und beobachte, wie oftinvalid_plan:max_stepsauftritt. - Ändere
GOALzumanager_id=7 (Max)und vergleiche die finale Synthese. - Füge Cost/Token-Guardrails in
Budgetund im finalen JSON-Ergebnis hinzu.
Vollständiger Code auf GitHub
Im Repository liegt die vollständige runnable-Version dieses Beispiels: Planning, Policy Boundary, sequenzielle Schrittausführung und Stop-Reasons.
Vollständigen Code auf GitHub ansehen ↗