Kern des Musters (Kurz)
Orchestrator Agent ist ein Muster, bei dem der Agent die Domänenarbeit nicht selbst ausführt, sondern mehrere spezialisierte Ausführer koordiniert.
Das LLM entscheidet welche Subtasks gestartet werden (was zu tun ist), während die Policy-Schicht kontrolliert wie sie sicher gestartet werden (was erlaubt ist, wie lange gewartet wird, wann erneut versucht wird, welche Limits gelten).
Was dieses Beispiel zeigt
- separater
Plan-Schritt zum Erstellen eines Subtask-Sets - paralleler
Dispatchder Subtasks (ThreadPoolExecutor) - Policy Boundary zwischen orchestration decision (LLM) und worker execution
- strikte Plan-Validierung (
kind,tasks, Task-Shape) - getrennte Allowlists für Policy- und Execution-Layer
- Timeout pro Subtask +
retry_oncefür transient timeout - Partial-Results-Policy: Eine nicht-kritische Task kann fehlschlagen, aber der Run läuft weiter
- explizite
stop_reason-Werte für Debugging und Production-Monitoring
Architektur
- Das LLM erhält das Goal und liefert einen Orchestration-Plan als JSON zurück (
kind="plan",tasks). - Die Policy Boundary validiert den Plan und verwirft nicht erlaubte Workers/Argumente.
OrchestratorGatewaystartet Subtasks parallel mit Timeout und Retry.- Ergebnisse (erfolgreiche und failed) werden in einem einheitlichen
history/tracegesammelt. - Wenn eine kritische Task nicht abgeschlossen wird, stoppt der Run.
- Wenn minimal erforderliche Fakten vorhanden sind, erstellt das LLM einen kurzen finalen Operations-Report.
Das LLM liefert Intent (Plan), der als untrusted Input behandelt wird: Die Policy Boundary validiert zuerst und startet dann (wenn erlaubt) Workers.
Projektstruktur
examples/
└── agent-patterns/
└── orchestrator-agent/
└── python/
├── main.py # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: validation + timeout/retry + budgets
├── workers.py # deterministic specialists (sales/payments/inventory)
└── requirements.txt
Ausführen
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ ist erforderlich.
Variante über export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Variante über .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Umgebungsvariablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.
Aufgabe
Stell dir einen realen Production-Fall vor:
"Erstelle einen morgendlichen Operations-Bericht für den US-Markt am 2026-02-26: Verkäufe, Zahlungsvorfälle und Bestandsrisiken."
Der Agent soll nicht alles in einem einzigen Ablauf erledigen. Er soll:
- die Aufgabe in unabhängige Subtasks aufteilen
- Subtasks parallel ausführen
- eine Timeout-Task einmal erneut starten
- ein einheitliches kurzes finales Briefing erstellen
Lösung
Hier arbeitet der Orchestrator als Koordinator:
- LLM erzeugt einen Plan mit Tasks für spezialisierte Workers
- die Policy-Schicht prüft den Plan und blockiert nicht erlaubte Tasks
- Gateway führt Workers parallel mit Timeout/Retry aus
- Ergebnisse werden in ein Aggregat-Modell gesammelt
- der kurze finale Text wird erst nach Sammlung der Fakten erzeugt
Das ist weder ReAct noch Routing:
- nicht ReAct, weil der Fokus nicht auf einem sequenziellen
Think -> Act-Zyklus eines einzelnen Agenten liegt - nicht Routing, weil hier mehrere Ausführer gleichzeitig benötigt werden, nicht nur ein Target
Code
workers.py — spezialisierte Ausführer
from __future__ import annotations
import time
from typing import Any
SALES_METRICS = {
"2026-02-26:US": {
"gross_sales_usd": 182_450.0,
"orders": 4_820,
"aov_usd": 37.85,
}
}
PAYMENT_ALERTS = {
"2026-02-26:US": {
"failed_payment_rate": 0.023,
"chargeback_alerts": 3,
"gateway_incident": "none",
}
}
INVENTORY_ALERTS = {
"2026-02-26:US": {
"low_stock_skus": ["SKU-4411", "SKU-8820"],
"out_of_stock_skus": ["SKU-9033"],
"restock_eta_days": 2,
}
}
_ATTEMPT_STATE: dict[str, int] = {}
def _key(report_date: str, region: str) -> str:
return f"{report_date}:{region.upper()}"
def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.4)
metrics = SALES_METRICS.get(_key(report_date, region))
if not metrics:
return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
return {"status": "done", "worker": "sales_worker", "result": metrics}
def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
# Simulate a transient timeout on the first attempt within one request_id.
attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
_ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
attempt = _ATTEMPT_STATE[attempt_key]
if attempt == 1:
time.sleep(2.6) # Exceeds gateway timeout on purpose.
else:
time.sleep(0.3)
alerts = PAYMENT_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
return {"status": "done", "worker": "payments_worker", "result": alerts}
def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.5)
alerts = INVENTORY_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
return {"status": "done", "worker": "inventory_worker", "result": alerts}
Was hier am wichtigsten ist (einfach erklärt)
- Jeder Worker ist nur für seinen Domänenbereich verantwortlich.
payments_workererzeugt beim 1. Versuch absichtlich einen Timeout, um die Retry-Policy zu zeigen.- Workers treffen keine Orchestration-Entscheidungen: Sie liefern nur Fakten.
Contract zwischen den Schichten in diesem Beispiel:
- Das LLM kann nur einen Plan vorschlagen (
kind="plan",tasks). - Gateway validiert den Plan und verwirft gefährliche/ungültige Tasks.
- Unknown keys in Tasks werden bei der Normalisierung ignoriert, wenn Pflichtfelder vorhanden sind.
- Gateway wendet Runtime-Policy an: allowlist, timeout, retry, dispatch budget, deadline.
- Workers führen nur ihre Domänenfunktion aus und geben ein strukturiertes
dictzurück. - Orchestrator entscheidet, ob der Run finalisiert werden kann (zum Beispiel nach failed critical task -> stop).
gateway.py — policy boundary (wichtigste Schicht)
from __future__ import annotations
import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_tasks: int = 4
max_parallel: int = 3
max_retries_per_task: int = 1
max_dispatches: int = 8
task_timeout_seconds: float = 2.0
max_seconds: int = 25
def args_hash(args: dict[str, Any]) -> str:
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
def validate_orchestration_plan(
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
if not isinstance(raw_plan, dict):
raise StopRun("invalid_plan:non_json")
if raw_plan.get("kind") != "plan":
raise StopRun("invalid_plan:kind")
tasks = raw_plan.get("tasks")
if not isinstance(tasks, list):
raise StopRun("invalid_plan:tasks")
if not (1 <= len(tasks) <= max_tasks):
raise StopRun("invalid_plan:max_tasks")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
required_keys = {"id", "worker", "args", "critical"}
for task in tasks:
if not isinstance(task, dict):
raise StopRun("invalid_plan:task_shape")
if not required_keys.issubset(task.keys()):
raise StopRun("invalid_plan:missing_keys")
# Ignore unknown keys and keep only contract fields.
task_id = task["id"]
worker = task["worker"]
args = task["args"]
critical = task["critical"]
if not isinstance(task_id, str) or not task_id.strip():
raise StopRun("invalid_plan:task_id")
if task_id in seen_ids:
raise StopRun("invalid_plan:duplicate_task_id")
seen_ids.add(task_id)
if not isinstance(worker, str) or not worker.strip():
raise StopRun("invalid_plan:worker")
if worker not in allowed_workers:
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
if not isinstance(args, dict):
raise StopRun("invalid_plan:args")
if not isinstance(critical, bool):
raise StopRun("invalid_plan:critical")
normalized.append(
{
"id": task_id.strip(),
"worker": worker.strip(),
"args": dict(args),
"critical": critical,
}
)
return normalized
class OrchestratorGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
) -> None:
self.allow = allow
self.registry = registry
self.budget = budget
self.dispatches = 0
self._lock = threading.Lock()
def _consume_dispatch_budget(self) -> None:
with self._lock:
self.dispatches += 1
if self.dispatches > self.budget.max_dispatches:
raise StopRun("max_dispatches")
def _call_once(
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
) -> dict[str, Any]:
if worker_name not in self.allow:
raise StopRun(f"worker_denied:{worker_name}")
fn = self.registry.get(worker_name)
if fn is None:
raise StopRun(f"worker_missing:{worker_name}")
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(fn, **args)
try:
result = future.result(timeout=task_timeout)
except TimeoutError as exc:
raise StopRun("task_timeout") from exc
except TypeError as exc:
raise StopRun(f"worker_bad_args:{worker_name}") from exc
if not isinstance(result, dict):
raise StopRun(f"worker_bad_result:{worker_name}")
return result
def _run_task_with_retry(
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
) -> dict[str, Any]:
task_id = task["id"]
worker_name = task["worker"]
semantic_args = dict(task["args"])
semantic_hash = args_hash(semantic_args)
base_args = dict(semantic_args)
base_args["request_id"] = request_id
attempts_total = self.budget.max_retries_per_task + 1
last_reason = "unknown"
for attempt in range(1, attempts_total + 1):
try:
self._consume_dispatch_budget()
observation = self._call_once(
worker_name, base_args, deadline_monotonic=deadline_monotonic
)
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "done",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"observation": observation,
}
except StopRun as exc:
last_reason = exc.reason
if exc.reason == "task_timeout" and attempt < attempts_total:
continue
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempts_total,
"retried": True,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
*,
request_id: str,
deadline_monotonic: float,
) -> list[dict[str, Any]]:
if not tasks:
return []
indexed_tasks = list(enumerate(tasks))
output: list[tuple[int, dict[str, Any]]] = []
parallelism = min(self.budget.max_parallel, len(tasks))
with ThreadPoolExecutor(max_workers=parallelism) as pool:
future_to_idx = {
pool.submit(
self._run_task_with_retry, task, request_id, deadline_monotonic
): idx
for idx, task in indexed_tasks
}
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
try:
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
idx = future_to_idx[future]
output.append((idx, future.result()))
except TimeoutError as exc:
raise StopRun("max_seconds") from exc
output.sort(key=lambda item: item[0])
return [item[1] for item in output]
Was hier am wichtigsten ist (einfach erklärt)
- Genau im Gateway wird die Execution-Policy umgesetzt: allowlist, timeout, retry, dispatch budget.
validate_orchestration_plan(...)prüft den Pflichtfeld-Vertrag und ignoriert zusätzliche Keys.dispatch_parallel(...)erhält die Parallelität und kontrolliert zusätzlich die globale Deadline (max_seconds) innerhalb des Gateways.
llm.py — Planung und finale Synthese
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"tasks": [
{"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
]
}
Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()
WORKER_CATALOG = [
{
"name": "sales_worker",
"description": "Provides sales KPIs for a date and region",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "payments_worker",
"description": "Provides payment failure and chargeback signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "inventory_worker",
"description": "Provides low-stock and out-of-stock risk signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(
*,
goal: str,
report_date: str,
region: str,
max_tasks: int,
) -> dict[str, Any]:
payload = {
"goal": goal,
"report_date": report_date,
"region": region,
"max_tasks": max_tasks,
"available_workers": WORKER_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Was hier am wichtigsten ist (einfach erklärt)
- LLM liefert nur plan/answer zurück, startet Workers aber nicht direkt.
- Planner erhält
max_tasksundavailable_workers, daher ist die Auswahl schon in der Intent-Phase begrenzt. - Bei non-JSON oder Timeout werden explizite Signale für einen kontrollierten Stopp zurückgegeben.
main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker
REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
"Prepare a morning operations report for e-commerce region US on 2026-02-26. "
"Include sales, payment risk, and inventory risk with one concrete action."
)
BUDGET = Budget(
max_tasks=4,
max_parallel=3,
max_retries_per_task=1,
max_dispatches=8,
task_timeout_seconds=2.0,
max_seconds=25,
)
WORKER_REGISTRY = {
"sales_worker": sales_worker,
"payments_worker": payments_worker,
"inventory_worker": inventory_worker,
}
ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
{"sales_worker", "payments_worker", "inventory_worker"}
if INVENTORY_RUNTIME_ENABLED
else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.
def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
done_by_worker: dict[str, dict[str, Any]] = {}
failed: list[dict[str, Any]] = []
for item in task_results:
if item["status"] == "done":
done_by_worker[item["worker"]] = item["observation"]
else:
failed.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"critical": item["critical"],
"stop_reason": item["stop_reason"],
}
)
sales = done_by_worker.get("sales_worker", {}).get("result", {})
payments = done_by_worker.get("payments_worker", {}).get("result", {})
inventory = done_by_worker.get("inventory_worker", {}).get("result", {})
health = "green"
if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
health = "yellow"
if payments.get("gateway_incident") not in (None, "none"):
health = "red"
return {
"report_date": REPORT_DATE,
"region": REGION,
"health": health,
"sales": sales,
"payments": payments,
"inventory": inventory,
"failed_tasks": failed,
}
def run_orchestrator(goal: str) -> dict[str, Any]:
started = time.monotonic()
deadline = started + BUDGET.max_seconds
request_id = uuid.uuid4().hex[:10]
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = OrchestratorGateway(
allow=ALLOWED_WORKERS_EXECUTION,
registry=WORKER_REGISTRY,
budget=BUDGET,
)
try:
raw_plan = create_plan(
goal=goal,
report_date=REPORT_DATE,
region=REGION,
max_tasks=BUDGET.max_tasks,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
tasks = validate_orchestration_plan(
raw_plan,
allowed_workers=ALLOWED_WORKERS_POLICY,
max_tasks=BUDGET.max_tasks,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "plan",
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
try:
task_results = gateway.dispatch_parallel(
tasks,
request_id=request_id,
deadline_monotonic=deadline,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
for item in task_results:
trace.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"status": item["status"],
"attempts_used": item["attempts_used"],
"retried": item["retried"],
"args_hash": item["args_hash"],
"stop_reason": item.get("stop_reason"),
"critical": item["critical"],
}
)
history.append(item)
failed_critical = [
item
for item in task_results
if item["status"] == "failed" and item["critical"]
]
if failed_critical:
return {
"status": "stopped",
"stop_reason": "critical_task_failed",
"phase": "dispatch",
"plan": tasks,
"failed_critical": failed_critical,
"trace": trace,
"history": history,
}
aggregate = aggregate_results(task_results)
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(goal=goal, aggregate=aggregate)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": tasks,
"aggregate": aggregate,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_orchestrator(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Was hier am wichtigsten ist (einfach erklärt)
run_orchestrator(...)steuert den vollständigen Lebenszyklus eines Orchestration-Runs.- Die parallele Ausführung passiert nur im Gateway, nicht innerhalb von LLM-Entscheidungen.
ALLOWED_WORKERS_POLICYundALLOWED_WORKERS_EXECUTIONkönnen unterschiedlich sein (runtime feature-flag/tenant gate).- Es gibt eine klare Policy: Wenn eine kritische Task fehlschlägt, startet die Finalisierung nicht.
traceundhistoryliefern ein transparentes Audit: was lief, wie viele Versuche, warum es scheiterte.
requirements.txt
openai==2.21.0
Ausgabebeispiel
Wegen der Timeout-Simulation in payments_worker beim ersten Versuch enthält ein typischer Trace retried=true.
{
"status": "ok",
"stop_reason": "success",
"answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
"plan": [
{
"id": "t1",
"worker": "sales_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t2",
"worker": "payments_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t3",
"worker": "inventory_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
}
],
"aggregate": {
"report_date": "2026-02-26",
"region": "US",
"health": "yellow",
"sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
"payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
"inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
},
"trace": [
{
"task_id": "t1",
"worker": "sales_worker",
"status": "done",
"attempts_used": 1,
"retried": false
},
{
"task_id": "t2",
"worker": "payments_worker",
"status": "done",
"attempts_used": 2,
"retried": true
},
{
"task_id": "t3",
"worker": "inventory_worker",
"status": "done",
"attempts_used": 1,
"retried": false
}
],
"history": [{...}]
}
Das ist ein gekürztes Beispiel: history ist kompakt dargestellt, und das answer-Format kann zwischen Runs leicht variieren.
Wenn INVENTORY_RUNTIME_ENABLED=False gesetzt ist, erscheint worker_denied:inventory_worker im trace - ein klares Beispiel für den Unterschied zwischen Policy- und Execution-Allowlist.
Häufige stop_reason
success- Plan ausgeführt, finale Antwort erzeugtinvalid_plan:*- LLM-Plan hat die Policy-Validierung nicht bestandeninvalid_plan:missing_keys- einer Task fehlen verpflichtende Vertragsfelderllm_timeout- LLM hat nicht innerhalb vonOPENAI_TIMEOUT_SECONDSgeantwortetllm_empty- LLM hat eine leere finale Antwort zurückgegebenmax_seconds- gesamtes Zeitbudget des Runs überschrittenmax_dispatches- Dispatch-Limit für Subtasks überschritten (inklusive Retries)worker_denied:<name>- Worker ist nicht in der Execution-Allowlistworker_missing:<name>- Worker fehlt im Registryworker_bad_args:<name>- Task enthält ungültige Argumenteworker_bad_result:<name>- Worker lieferte Daten außerhalb des Vertragstask_timeout- Subtask wurde nicht innerhalb vontask_timeout_secondsabgeschlossencritical_task_failed- mindestens eine kritische Subtask ist fehlgeschlagen
Was hier NICHT gezeigt wird
- Keine Auth/PII und keine Tenant-Isolation.
- Keine Queue/kein persistenter Job-Runner (RabbitMQ/SQS/Kafka).
- Kein Circuit Breaker und keine komplexen Backoff-Policies.
- Keine Token-/Kostenbudgets (cost guardrails).
Was du als Nächstes ausprobieren kannst
- Setze
payments_workerauf nicht-kritisch (critical=false) und sieh, wie der Run ein Partial-Result zurückgibt. - Setze
INVENTORY_RUNTIME_ENABLED=Falseund prüfeworker_denied:inventory_worker. - Reduziere
task_timeout_secondsauf0.2und prüfe, wie die Anzahl von Timeout/Retry steigt. - Füge einen vierten Worker (
fraud_worker) hinzu und vergleiche die Run-Zeit mitmax_parallel=2undmax_parallel=4.