Orchestrator Agent — Python (einfaches Production-Beispiel mit LLM)

Production-style runnable Beispiel eines Orchestrator-Agenten in Python mit Subtask-Plan, parallelem Dispatch, Timeout/Retry-Policy und finaler Aggregation.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. workers.py — spezialisierte Ausführer
  10. gateway.py — policy boundary (wichtigste Schicht)
  11. llm.py — Planung und finale Synthese
  12. main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
  13. requirements.txt
  14. Ausgabebeispiel
  15. Häufige stop_reason
  16. Was hier NICHT gezeigt wird
  17. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

Orchestrator Agent ist ein Muster, bei dem der Agent die Domänenarbeit nicht selbst ausführt, sondern mehrere spezialisierte Ausführer koordiniert.

Das LLM entscheidet welche Subtasks gestartet werden (was zu tun ist), während die Policy-Schicht kontrolliert wie sie sicher gestartet werden (was erlaubt ist, wie lange gewartet wird, wann erneut versucht wird, welche Limits gelten).


Was dieses Beispiel zeigt

  • separater Plan-Schritt zum Erstellen eines Subtask-Sets
  • paralleler Dispatch der Subtasks (ThreadPoolExecutor)
  • Policy Boundary zwischen orchestration decision (LLM) und worker execution
  • strikte Plan-Validierung (kind, tasks, Task-Shape)
  • getrennte Allowlists für Policy- und Execution-Layer
  • Timeout pro Subtask + retry_once für transient timeout
  • Partial-Results-Policy: Eine nicht-kritische Task kann fehlschlagen, aber der Run läuft weiter
  • explizite stop_reason-Werte für Debugging und Production-Monitoring

Architektur

  1. Das LLM erhält das Goal und liefert einen Orchestration-Plan als JSON zurück (kind="plan", tasks).
  2. Die Policy Boundary validiert den Plan und verwirft nicht erlaubte Workers/Argumente.
  3. OrchestratorGateway startet Subtasks parallel mit Timeout und Retry.
  4. Ergebnisse (erfolgreiche und failed) werden in einem einheitlichen history/trace gesammelt.
  5. Wenn eine kritische Task nicht abgeschlossen wird, stoppt der Run.
  6. Wenn minimal erforderliche Fakten vorhanden sind, erstellt das LLM einen kurzen finalen Operations-Report.

Das LLM liefert Intent (Plan), der als untrusted Input behandelt wird: Die Policy Boundary validiert zuerst und startet dann (wenn erlaubt) Workers.


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── orchestrator-agent/
        └── python/
            ├── main.py           # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: validation + timeout/retry + budgets
            ├── workers.py        # deterministic specialists (sales/payments/inventory)
            └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Umgebungsvariablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir einen realen Production-Fall vor:

"Erstelle einen morgendlichen Operations-Bericht für den US-Markt am 2026-02-26: Verkäufe, Zahlungsvorfälle und Bestandsrisiken."

Der Agent soll nicht alles in einem einzigen Ablauf erledigen. Er soll:

  • die Aufgabe in unabhängige Subtasks aufteilen
  • Subtasks parallel ausführen
  • eine Timeout-Task einmal erneut starten
  • ein einheitliches kurzes finales Briefing erstellen

Lösung

Hier arbeitet der Orchestrator als Koordinator:

  • LLM erzeugt einen Plan mit Tasks für spezialisierte Workers
  • die Policy-Schicht prüft den Plan und blockiert nicht erlaubte Tasks
  • Gateway führt Workers parallel mit Timeout/Retry aus
  • Ergebnisse werden in ein Aggregat-Modell gesammelt
  • der kurze finale Text wird erst nach Sammlung der Fakten erzeugt

Das ist weder ReAct noch Routing:

  • nicht ReAct, weil der Fokus nicht auf einem sequenziellen Think -> Act-Zyklus eines einzelnen Agenten liegt
  • nicht Routing, weil hier mehrere Ausführer gleichzeitig benötigt werden, nicht nur ein Target

Code

workers.py — spezialisierte Ausführer

PYTHON
from __future__ import annotations

import time
from typing import Any

SALES_METRICS = {
    "2026-02-26:US": {
        "gross_sales_usd": 182_450.0,
        "orders": 4_820,
        "aov_usd": 37.85,
    }
}

PAYMENT_ALERTS = {
    "2026-02-26:US": {
        "failed_payment_rate": 0.023,
        "chargeback_alerts": 3,
        "gateway_incident": "none",
    }
}

INVENTORY_ALERTS = {
    "2026-02-26:US": {
        "low_stock_skus": ["SKU-4411", "SKU-8820"],
        "out_of_stock_skus": ["SKU-9033"],
        "restock_eta_days": 2,
    }
}

_ATTEMPT_STATE: dict[str, int] = {}


def _key(report_date: str, region: str) -> str:
    return f"{report_date}:{region.upper()}"


def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.4)
    metrics = SALES_METRICS.get(_key(report_date, region))
    if not metrics:
        return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
    return {"status": "done", "worker": "sales_worker", "result": metrics}


def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    # Simulate a transient timeout on the first attempt within one request_id.
    attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
    _ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
    attempt = _ATTEMPT_STATE[attempt_key]

    if attempt == 1:
        time.sleep(2.6)  # Exceeds gateway timeout on purpose.
    else:
        time.sleep(0.3)

    alerts = PAYMENT_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
    return {"status": "done", "worker": "payments_worker", "result": alerts}


def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.5)
    alerts = INVENTORY_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
    return {"status": "done", "worker": "inventory_worker", "result": alerts}

Was hier am wichtigsten ist (einfach erklärt)

  • Jeder Worker ist nur für seinen Domänenbereich verantwortlich.
  • payments_worker erzeugt beim 1. Versuch absichtlich einen Timeout, um die Retry-Policy zu zeigen.
  • Workers treffen keine Orchestration-Entscheidungen: Sie liefern nur Fakten.

Contract zwischen den Schichten in diesem Beispiel:

  • Das LLM kann nur einen Plan vorschlagen (kind="plan", tasks).
  • Gateway validiert den Plan und verwirft gefährliche/ungültige Tasks.
  • Unknown keys in Tasks werden bei der Normalisierung ignoriert, wenn Pflichtfelder vorhanden sind.
  • Gateway wendet Runtime-Policy an: allowlist, timeout, retry, dispatch budget, deadline.
  • Workers führen nur ihre Domänenfunktion aus und geben ein strukturiertes dict zurück.
  • Orchestrator entscheidet, ob der Run finalisiert werden kann (zum Beispiel nach failed critical task -> stop).

gateway.py — policy boundary (wichtigste Schicht)

PYTHON
from __future__ import annotations

import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_tasks: int = 4
    max_parallel: int = 3
    max_retries_per_task: int = 1
    max_dispatches: int = 8
    task_timeout_seconds: float = 2.0
    max_seconds: int = 25


def args_hash(args: dict[str, Any]) -> str:
    stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]


def validate_orchestration_plan(
    raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
    if not isinstance(raw_plan, dict):
        raise StopRun("invalid_plan:non_json")
    if raw_plan.get("kind") != "plan":
        raise StopRun("invalid_plan:kind")

    tasks = raw_plan.get("tasks")
    if not isinstance(tasks, list):
        raise StopRun("invalid_plan:tasks")
    if not (1 <= len(tasks) <= max_tasks):
        raise StopRun("invalid_plan:max_tasks")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()
    required_keys = {"id", "worker", "args", "critical"}

    for task in tasks:
        if not isinstance(task, dict):
            raise StopRun("invalid_plan:task_shape")
        if not required_keys.issubset(task.keys()):
            raise StopRun("invalid_plan:missing_keys")

        # Ignore unknown keys and keep only contract fields.
        task_id = task["id"]
        worker = task["worker"]
        args = task["args"]
        critical = task["critical"]

        if not isinstance(task_id, str) or not task_id.strip():
            raise StopRun("invalid_plan:task_id")
        if task_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_task_id")
        seen_ids.add(task_id)

        if not isinstance(worker, str) or not worker.strip():
            raise StopRun("invalid_plan:worker")
        if worker not in allowed_workers:
            raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")

        if not isinstance(args, dict):
            raise StopRun("invalid_plan:args")
        if not isinstance(critical, bool):
            raise StopRun("invalid_plan:critical")

        normalized.append(
            {
                "id": task_id.strip(),
                "worker": worker.strip(),
                "args": dict(args),
                "critical": critical,
            }
        )

    return normalized


class OrchestratorGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ) -> None:
        self.allow = allow
        self.registry = registry
        self.budget = budget
        self.dispatches = 0
        self._lock = threading.Lock()

    def _consume_dispatch_budget(self) -> None:
        with self._lock:
            self.dispatches += 1
            if self.dispatches > self.budget.max_dispatches:
                raise StopRun("max_dispatches")

    def _call_once(
        self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
    ) -> dict[str, Any]:
        if worker_name not in self.allow:
            raise StopRun(f"worker_denied:{worker_name}")
        fn = self.registry.get(worker_name)
        if fn is None:
            raise StopRun(f"worker_missing:{worker_name}")

        remaining = deadline_monotonic - time.monotonic()
        if remaining <= 0:
            raise StopRun("max_seconds")
        task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))

        with ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(fn, **args)
            try:
                result = future.result(timeout=task_timeout)
            except TimeoutError as exc:
                raise StopRun("task_timeout") from exc
            except TypeError as exc:
                raise StopRun(f"worker_bad_args:{worker_name}") from exc

        if not isinstance(result, dict):
            raise StopRun(f"worker_bad_result:{worker_name}")
        return result

    def _run_task_with_retry(
        self, task: dict[str, Any], request_id: str, deadline_monotonic: float
    ) -> dict[str, Any]:
        task_id = task["id"]
        worker_name = task["worker"]
        semantic_args = dict(task["args"])
        semantic_hash = args_hash(semantic_args)
        base_args = dict(semantic_args)
        base_args["request_id"] = request_id

        attempts_total = self.budget.max_retries_per_task + 1
        last_reason = "unknown"

        for attempt in range(1, attempts_total + 1):
            try:
                self._consume_dispatch_budget()
                observation = self._call_once(
                    worker_name, base_args, deadline_monotonic=deadline_monotonic
                )
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "done",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "observation": observation,
                }
            except StopRun as exc:
                last_reason = exc.reason
                if exc.reason == "task_timeout" and attempt < attempts_total:
                    continue
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "failed",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "stop_reason": last_reason,
                }

        return {
            "task_id": task_id,
            "worker": worker_name,
            "critical": task["critical"],
            "status": "failed",
            "attempts_used": attempts_total,
            "retried": True,
            "args_hash": semantic_hash,
            "stop_reason": last_reason,
        }

    def dispatch_parallel(
        self,
        tasks: list[dict[str, Any]],
        *,
        request_id: str,
        deadline_monotonic: float,
    ) -> list[dict[str, Any]]:
        if not tasks:
            return []

        indexed_tasks = list(enumerate(tasks))
        output: list[tuple[int, dict[str, Any]]] = []
        parallelism = min(self.budget.max_parallel, len(tasks))

        with ThreadPoolExecutor(max_workers=parallelism) as pool:
            future_to_idx = {
                pool.submit(
                    self._run_task_with_retry, task, request_id, deadline_monotonic
                ): idx
                for idx, task in indexed_tasks
            }
            remaining = deadline_monotonic - time.monotonic()
            if remaining <= 0:
                raise StopRun("max_seconds")
            try:
                for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
                    idx = future_to_idx[future]
                    output.append((idx, future.result()))
            except TimeoutError as exc:
                raise StopRun("max_seconds") from exc

        output.sort(key=lambda item: item[0])
        return [item[1] for item in output]

Was hier am wichtigsten ist (einfach erklärt)

  • Genau im Gateway wird die Execution-Policy umgesetzt: allowlist, timeout, retry, dispatch budget.
  • validate_orchestration_plan(...) prüft den Pflichtfeld-Vertrag und ignoriert zusätzliche Keys.
  • dispatch_parallel(...) erhält die Parallelität und kontrolliert zusätzlich die globale Deadline (max_seconds) innerhalb des Gateways.

llm.py — Planung und finale Synthese

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "tasks": [
    {"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
  ]
}

Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()

WORKER_CATALOG = [
    {
        "name": "sales_worker",
        "description": "Provides sales KPIs for a date and region",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "payments_worker",
        "description": "Provides payment failure and chargeback signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "inventory_worker",
        "description": "Provides low-stock and out-of-stock risk signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(
    *,
    goal: str,
    report_date: str,
    region: str,
    max_tasks: int,
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "report_date": report_date,
        "region": region,
        "max_tasks": max_tasks,
        "available_workers": WORKER_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Was hier am wichtigsten ist (einfach erklärt)

  • LLM liefert nur plan/answer zurück, startet Workers aber nicht direkt.
  • Planner erhält max_tasks und available_workers, daher ist die Auswahl schon in der Intent-Phase begrenzt.
  • Bei non-JSON oder Timeout werden explizite Signale für einen kontrollierten Stopp zurückgegeben.

main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker

REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
    "Prepare a morning operations report for e-commerce region US on 2026-02-26. "
    "Include sales, payment risk, and inventory risk with one concrete action."
)

BUDGET = Budget(
    max_tasks=4,
    max_parallel=3,
    max_retries_per_task=1,
    max_dispatches=8,
    task_timeout_seconds=2.0,
    max_seconds=25,
)

WORKER_REGISTRY = {
    "sales_worker": sales_worker,
    "payments_worker": payments_worker,
    "inventory_worker": inventory_worker,
}

ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
    {"sales_worker", "payments_worker", "inventory_worker"}
    if INVENTORY_RUNTIME_ENABLED
    else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.


def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
    done_by_worker: dict[str, dict[str, Any]] = {}
    failed: list[dict[str, Any]] = []

    for item in task_results:
        if item["status"] == "done":
            done_by_worker[item["worker"]] = item["observation"]
        else:
            failed.append(
                {
                    "task_id": item["task_id"],
                    "worker": item["worker"],
                    "critical": item["critical"],
                    "stop_reason": item["stop_reason"],
                }
            )

    sales = done_by_worker.get("sales_worker", {}).get("result", {})
    payments = done_by_worker.get("payments_worker", {}).get("result", {})
    inventory = done_by_worker.get("inventory_worker", {}).get("result", {})

    health = "green"
    if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
        health = "yellow"
    if payments.get("gateway_incident") not in (None, "none"):
        health = "red"

    return {
        "report_date": REPORT_DATE,
        "region": REGION,
        "health": health,
        "sales": sales,
        "payments": payments,
        "inventory": inventory,
        "failed_tasks": failed,
    }


def run_orchestrator(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    deadline = started + BUDGET.max_seconds
    request_id = uuid.uuid4().hex[:10]

    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = OrchestratorGateway(
        allow=ALLOWED_WORKERS_EXECUTION,
        registry=WORKER_REGISTRY,
        budget=BUDGET,
    )

    try:
        raw_plan = create_plan(
            goal=goal,
            report_date=REPORT_DATE,
            region=REGION,
            max_tasks=BUDGET.max_tasks,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        tasks = validate_orchestration_plan(
            raw_plan,
            allowed_workers=ALLOWED_WORKERS_POLICY,
            max_tasks=BUDGET.max_tasks,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    try:
        task_results = gateway.dispatch_parallel(
            tasks,
            request_id=request_id,
            deadline_monotonic=deadline,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    for item in task_results:
        trace.append(
            {
                "task_id": item["task_id"],
                "worker": item["worker"],
                "status": item["status"],
                "attempts_used": item["attempts_used"],
                "retried": item["retried"],
                "args_hash": item["args_hash"],
                "stop_reason": item.get("stop_reason"),
                "critical": item["critical"],
            }
        )
        history.append(item)

    failed_critical = [
        item
        for item in task_results
        if item["status"] == "failed" and item["critical"]
    ]
    if failed_critical:
        return {
            "status": "stopped",
            "stop_reason": "critical_task_failed",
            "phase": "dispatch",
            "plan": tasks,
            "failed_critical": failed_critical,
            "trace": trace,
            "history": history,
        }

    aggregate = aggregate_results(task_results)

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(goal=goal, aggregate=aggregate)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": tasks,
        "aggregate": aggregate,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_orchestrator(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • run_orchestrator(...) steuert den vollständigen Lebenszyklus eines Orchestration-Runs.
  • Die parallele Ausführung passiert nur im Gateway, nicht innerhalb von LLM-Entscheidungen.
  • ALLOWED_WORKERS_POLICY und ALLOWED_WORKERS_EXECUTION können unterschiedlich sein (runtime feature-flag/tenant gate).
  • Es gibt eine klare Policy: Wenn eine kritische Task fehlschlägt, startet die Finalisierung nicht.
  • trace und history liefern ein transparentes Audit: was lief, wie viele Versuche, warum es scheiterte.

requirements.txt

TEXT
openai==2.21.0

Ausgabebeispiel

Wegen der Timeout-Simulation in payments_worker beim ersten Versuch enthält ein typischer Trace retried=true.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
  "plan": [
    {
      "id": "t1",
      "worker": "sales_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t2",
      "worker": "payments_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t3",
      "worker": "inventory_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    }
  ],
  "aggregate": {
    "report_date": "2026-02-26",
    "region": "US",
    "health": "yellow",
    "sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
    "payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
    "inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
  },
  "trace": [
    {
      "task_id": "t1",
      "worker": "sales_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    },
    {
      "task_id": "t2",
      "worker": "payments_worker",
      "status": "done",
      "attempts_used": 2,
      "retried": true
    },
    {
      "task_id": "t3",
      "worker": "inventory_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    }
  ],
  "history": [{...}]
}

Das ist ein gekürztes Beispiel: history ist kompakt dargestellt, und das answer-Format kann zwischen Runs leicht variieren.

Wenn INVENTORY_RUNTIME_ENABLED=False gesetzt ist, erscheint worker_denied:inventory_worker im trace - ein klares Beispiel für den Unterschied zwischen Policy- und Execution-Allowlist.


Häufige stop_reason

  • success - Plan ausgeführt, finale Antwort erzeugt
  • invalid_plan:* - LLM-Plan hat die Policy-Validierung nicht bestanden
  • invalid_plan:missing_keys - einer Task fehlen verpflichtende Vertragsfelder
  • llm_timeout - LLM hat nicht innerhalb von OPENAI_TIMEOUT_SECONDS geantwortet
  • llm_empty - LLM hat eine leere finale Antwort zurückgegeben
  • max_seconds - gesamtes Zeitbudget des Runs überschritten
  • max_dispatches - Dispatch-Limit für Subtasks überschritten (inklusive Retries)
  • worker_denied:<name> - Worker ist nicht in der Execution-Allowlist
  • worker_missing:<name> - Worker fehlt im Registry
  • worker_bad_args:<name> - Task enthält ungültige Argumente
  • worker_bad_result:<name> - Worker lieferte Daten außerhalb des Vertrags
  • task_timeout - Subtask wurde nicht innerhalb von task_timeout_seconds abgeschlossen
  • critical_task_failed - mindestens eine kritische Subtask ist fehlgeschlagen

Was hier NICHT gezeigt wird

  • Keine Auth/PII und keine Tenant-Isolation.
  • Keine Queue/kein persistenter Job-Runner (RabbitMQ/SQS/Kafka).
  • Kein Circuit Breaker und keine komplexen Backoff-Policies.
  • Keine Token-/Kostenbudgets (cost guardrails).

Was du als Nächstes ausprobieren kannst

  • Setze payments_worker auf nicht-kritisch (critical=false) und sieh, wie der Run ein Partial-Result zurückgibt.
  • Setze INVENTORY_RUNTIME_ENABLED=False und prüfe worker_denied:inventory_worker.
  • Reduziere task_timeout_seconds auf 0.2 und prüfe, wie die Anzahl von Timeout/Retry steigt.
  • Füge einen vierten Worker (fraud_worker) hinzu und vergleiche die Run-Zeit mit max_parallel=2 und max_parallel=4.
⏱️ 15 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.