Orchestrator Agent — Python (простий продакшен-приклад з LLM)

Production-style runnable приклад Orchestrator агента на Python з планом підзадач, паралельним dispatch, timeout/retry політикою і фінальною агрегацією.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Рішення
  8. Код
  9. workers.py — спеціалізовані виконавці
  10. gateway.py — policy boundary (найважливіший шар)
  11. llm.py — планування і фінальна синтеза
  12. main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
  13. requirements.txt
  14. Приклад виводу
  15. Типові stop_reason
  16. Що тут НЕ показано
  17. Що спробувати далі

Суть патерна (коротко)

Orchestrator Agent — це патерн, у якому агент сам не виконує доменну роботу, а координує кілька спеціалізованих виконавців.

LLM вирішує, які підзадачі запускати (що робити), а policy-шар контролює як їх запускати безпечно (що дозволено, скільки чекати, коли повторювати, які ліміти).


Що демонструє цей приклад

  • окремий етап Plan для побудови набору підзадач
  • Dispatch підзадач у паралелі (ThreadPoolExecutor)
  • policy boundary між orchestration decision (LLM) і worker execution
  • strict validation плану (kind, tasks, shape задачі)
  • окремі allowlist-и для policy і execution шарів
  • timeout на кожну підзадачу + retry_once для transient timeout
  • partial-results policy: non-critical задача може впасти, але run продовжується
  • явні stop_reason для дебагу і продакшен-моніторингу

Архітектура

  1. LLM отримує goal і повертає orchestration-план у JSON (kind="plan", tasks).
  2. Policy boundary валідовує план і відкидає недопустимі workers/аргументи.
  3. OrchestratorGateway паралельно запускає підзадачі з timeout і retry.
  4. Результати (успішні та failed) збираються в єдиний history/trace.
  5. Якщо критична задача не виконалась, run зупиняється.
  6. Якщо мінімально потрібні факти є, LLM формує короткий фінальний операційний звіт.

LLM повертає intent (план), який розглядається як недовірений input: policy boundary спершу валідовує його, а потім (якщо дозволено) запускає workers.


Структура проєкту

TEXT
examples/
└── agent-patterns/
    └── orchestrator-agent/
        └── python/
            ├── main.py           # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: validation + timeout/retry + budgets
            ├── workers.py        # deterministic specialists (sales/payments/inventory)
            └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.


Задача

Уяви реальний продакшен-кейс:

"Підготуй ранковий операційний звіт за 2026-02-26 для US ринку: продажі, платіжні інциденти та ризики запасів."

Агент не має робити все в одному потоці. Він має:

  • розбити задачу на незалежні підзадачі
  • запустити підзадачі паралельно
  • перезапустити timeout-задачу один раз
  • зібрати єдиний фінальний короткий бриф

Рішення

Тут Orchestrator працює як координатор:

  • LLM генерує план із задачами для спеціалізованих workers
  • policy шар перевіряє план і блокує недозволені задачі
  • gateway запускає workers у паралелі з timeout/retry
  • результати збираються в aggregate-модель
  • фінальний короткий текст генерується лише після збору фактів

Це не ReAct і не Routing:

  • не ReAct, бо акцент не на послідовному Think -> Act циклі одного агента
  • не Routing, бо тут треба кілька виконавців одночасно, а не один target

Код

workers.py — спеціалізовані виконавці

PYTHON
from __future__ import annotations

import time
from typing import Any

SALES_METRICS = {
    "2026-02-26:US": {
        "gross_sales_usd": 182_450.0,
        "orders": 4_820,
        "aov_usd": 37.85,
    }
}

PAYMENT_ALERTS = {
    "2026-02-26:US": {
        "failed_payment_rate": 0.023,
        "chargeback_alerts": 3,
        "gateway_incident": "none",
    }
}

INVENTORY_ALERTS = {
    "2026-02-26:US": {
        "low_stock_skus": ["SKU-4411", "SKU-8820"],
        "out_of_stock_skus": ["SKU-9033"],
        "restock_eta_days": 2,
    }
}

_ATTEMPT_STATE: dict[str, int] = {}


def _key(report_date: str, region: str) -> str:
    return f"{report_date}:{region.upper()}"


def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.4)
    metrics = SALES_METRICS.get(_key(report_date, region))
    if not metrics:
        return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
    return {"status": "done", "worker": "sales_worker", "result": metrics}


def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    # Simulate a transient timeout on the first attempt within one request_id.
    attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
    _ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
    attempt = _ATTEMPT_STATE[attempt_key]

    if attempt == 1:
        time.sleep(2.6)  # Exceeds gateway timeout on purpose.
    else:
        time.sleep(0.3)

    alerts = PAYMENT_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
    return {"status": "done", "worker": "payments_worker", "result": alerts}


def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    time.sleep(0.5)
    alerts = INVENTORY_ALERTS.get(_key(report_date, region))
    if not alerts:
        return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
    return {"status": "done", "worker": "inventory_worker", "result": alerts}

Що тут найважливіше (простими словами)

  • Кожен worker відповідає лише за свою доменну частину.
  • payments_worker спеціально дає timeout на 1-й спробі, щоб показати retry-policy.
  • Workers не приймають orchestration-рішень: тільки повертають факти.

Contract між шарами в цьому прикладі:

  • LLM може лише запропонувати план (kind="plan", tasks).
  • Gateway валідовує план і відкидає небезпечні/некоректні задачі.
  • Unknown keys у задачах ігноруються під час нормалізації, якщо обов'язкові поля присутні.
  • Gateway застосовує runtime-policy: allowlist, timeout, retry, dispatch budget, deadline.
  • Workers виконують лише свою доменну функцію і повертають структурований dict.
  • Orchestrator вирішує, чи можна фіналізувати run (наприклад, після failed critical task — стоп).

gateway.py — policy boundary (найважливіший шар)

PYTHON
from __future__ import annotations

import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_tasks: int = 4
    max_parallel: int = 3
    max_retries_per_task: int = 1
    max_dispatches: int = 8
    task_timeout_seconds: float = 2.0
    max_seconds: int = 25


def args_hash(args: dict[str, Any]) -> str:
    stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]


def validate_orchestration_plan(
    raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
    if not isinstance(raw_plan, dict):
        raise StopRun("invalid_plan:non_json")
    if raw_plan.get("kind") != "plan":
        raise StopRun("invalid_plan:kind")

    tasks = raw_plan.get("tasks")
    if not isinstance(tasks, list):
        raise StopRun("invalid_plan:tasks")
    if not (1 <= len(tasks) <= max_tasks):
        raise StopRun("invalid_plan:max_tasks")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()
    required_keys = {"id", "worker", "args", "critical"}

    for task in tasks:
        if not isinstance(task, dict):
            raise StopRun("invalid_plan:task_shape")
        if not required_keys.issubset(task.keys()):
            raise StopRun("invalid_plan:missing_keys")

        # Ignore unknown keys and keep only contract fields.
        task_id = task["id"]
        worker = task["worker"]
        args = task["args"]
        critical = task["critical"]

        if not isinstance(task_id, str) or not task_id.strip():
            raise StopRun("invalid_plan:task_id")
        if task_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_task_id")
        seen_ids.add(task_id)

        if not isinstance(worker, str) or not worker.strip():
            raise StopRun("invalid_plan:worker")
        if worker not in allowed_workers:
            raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")

        if not isinstance(args, dict):
            raise StopRun("invalid_plan:args")
        if not isinstance(critical, bool):
            raise StopRun("invalid_plan:critical")

        normalized.append(
            {
                "id": task_id.strip(),
                "worker": worker.strip(),
                "args": dict(args),
                "critical": critical,
            }
        )

    return normalized


class OrchestratorGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ) -> None:
        self.allow = allow
        self.registry = registry
        self.budget = budget
        self.dispatches = 0
        self._lock = threading.Lock()

    def _consume_dispatch_budget(self) -> None:
        with self._lock:
            self.dispatches += 1
            if self.dispatches > self.budget.max_dispatches:
                raise StopRun("max_dispatches")

    def _call_once(
        self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
    ) -> dict[str, Any]:
        if worker_name not in self.allow:
            raise StopRun(f"worker_denied:{worker_name}")
        fn = self.registry.get(worker_name)
        if fn is None:
            raise StopRun(f"worker_missing:{worker_name}")

        remaining = deadline_monotonic - time.monotonic()
        if remaining <= 0:
            raise StopRun("max_seconds")
        task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))

        with ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(fn, **args)
            try:
                result = future.result(timeout=task_timeout)
            except TimeoutError as exc:
                raise StopRun("task_timeout") from exc
            except TypeError as exc:
                raise StopRun(f"worker_bad_args:{worker_name}") from exc

        if not isinstance(result, dict):
            raise StopRun(f"worker_bad_result:{worker_name}")
        return result

    def _run_task_with_retry(
        self, task: dict[str, Any], request_id: str, deadline_monotonic: float
    ) -> dict[str, Any]:
        task_id = task["id"]
        worker_name = task["worker"]
        semantic_args = dict(task["args"])
        semantic_hash = args_hash(semantic_args)
        base_args = dict(semantic_args)
        base_args["request_id"] = request_id

        attempts_total = self.budget.max_retries_per_task + 1
        last_reason = "unknown"

        for attempt in range(1, attempts_total + 1):
            try:
                self._consume_dispatch_budget()
                observation = self._call_once(
                    worker_name, base_args, deadline_monotonic=deadline_monotonic
                )
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "done",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "observation": observation,
                }
            except StopRun as exc:
                last_reason = exc.reason
                if exc.reason == "task_timeout" and attempt < attempts_total:
                    continue
                return {
                    "task_id": task_id,
                    "worker": worker_name,
                    "critical": task["critical"],
                    "status": "failed",
                    "attempts_used": attempt,
                    "retried": attempt > 1,
                    "args_hash": semantic_hash,
                    "stop_reason": last_reason,
                }

        return {
            "task_id": task_id,
            "worker": worker_name,
            "critical": task["critical"],
            "status": "failed",
            "attempts_used": attempts_total,
            "retried": True,
            "args_hash": semantic_hash,
            "stop_reason": last_reason,
        }

    def dispatch_parallel(
        self,
        tasks: list[dict[str, Any]],
        *,
        request_id: str,
        deadline_monotonic: float,
    ) -> list[dict[str, Any]]:
        if not tasks:
            return []

        indexed_tasks = list(enumerate(tasks))
        output: list[tuple[int, dict[str, Any]]] = []
        parallelism = min(self.budget.max_parallel, len(tasks))

        with ThreadPoolExecutor(max_workers=parallelism) as pool:
            future_to_idx = {
                pool.submit(
                    self._run_task_with_retry, task, request_id, deadline_monotonic
                ): idx
                for idx, task in indexed_tasks
            }
            remaining = deadline_monotonic - time.monotonic()
            if remaining <= 0:
                raise StopRun("max_seconds")
            try:
                for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
                    idx = future_to_idx[future]
                    output.append((idx, future.result()))
            except TimeoutError as exc:
                raise StopRun("max_seconds") from exc

        output.sort(key=lambda item: item[0])
        return [item[1] for item in output]

Що тут найважливіше (простими словами)

  • Саме gateway реалізує execution-policy: allowlist, timeout, retry, dispatch-budget.
  • validate_orchestration_plan(...) перевіряє обов'язковий контракт полів і ігнорує зайві ключі.
  • dispatch_parallel(...) зберігає паралельність, але ще й контролює global deadline (max_seconds) всередині gateway.

llm.py — планування і фінальна синтеза

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "tasks": [
    {"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
  ]
}

Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()

WORKER_CATALOG = [
    {
        "name": "sales_worker",
        "description": "Provides sales KPIs for a date and region",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "payments_worker",
        "description": "Provides payment failure and chargeback signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
    {
        "name": "inventory_worker",
        "description": "Provides low-stock and out-of-stock risk signals",
        "args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(
    *,
    goal: str,
    report_date: str,
    region: str,
    max_tasks: int,
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "report_date": report_date,
        "region": region,
        "max_tasks": max_tasks,
        "available_workers": WORKER_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Що тут найважливіше (простими словами)

  • LLM повертає тільки plan/answer, але не запускає workers напряму.
  • Planner отримує max_tasks і available_workers, тому вибір обмежений ще на етапі intent.
  • При non-JSON або timeout повертаються явні сигнали для контрольованої зупинки.

main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker

REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
    "Prepare a morning operations report for e-commerce region US on 2026-02-26. "
    "Include sales, payment risk, and inventory risk with one concrete action."
)

BUDGET = Budget(
    max_tasks=4,
    max_parallel=3,
    max_retries_per_task=1,
    max_dispatches=8,
    task_timeout_seconds=2.0,
    max_seconds=25,
)

WORKER_REGISTRY = {
    "sales_worker": sales_worker,
    "payments_worker": payments_worker,
    "inventory_worker": inventory_worker,
}

ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
    {"sales_worker", "payments_worker", "inventory_worker"}
    if INVENTORY_RUNTIME_ENABLED
    else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.


def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
    done_by_worker: dict[str, dict[str, Any]] = {}
    failed: list[dict[str, Any]] = []

    for item in task_results:
        if item["status"] == "done":
            done_by_worker[item["worker"]] = item["observation"]
        else:
            failed.append(
                {
                    "task_id": item["task_id"],
                    "worker": item["worker"],
                    "critical": item["critical"],
                    "stop_reason": item["stop_reason"],
                }
            )

    sales = done_by_worker.get("sales_worker", {}).get("result", {})
    payments = done_by_worker.get("payments_worker", {}).get("result", {})
    inventory = done_by_worker.get("inventory_worker", {}).get("result", {})

    health = "green"
    if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
        health = "yellow"
    if payments.get("gateway_incident") not in (None, "none"):
        health = "red"

    return {
        "report_date": REPORT_DATE,
        "region": REGION,
        "health": health,
        "sales": sales,
        "payments": payments,
        "inventory": inventory,
        "failed_tasks": failed,
    }


def run_orchestrator(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    deadline = started + BUDGET.max_seconds
    request_id = uuid.uuid4().hex[:10]

    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = OrchestratorGateway(
        allow=ALLOWED_WORKERS_EXECUTION,
        registry=WORKER_REGISTRY,
        budget=BUDGET,
    )

    try:
        raw_plan = create_plan(
            goal=goal,
            report_date=REPORT_DATE,
            region=REGION,
            max_tasks=BUDGET.max_tasks,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        tasks = validate_orchestration_plan(
            raw_plan,
            allowed_workers=ALLOWED_WORKERS_POLICY,
            max_tasks=BUDGET.max_tasks,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    try:
        task_results = gateway.dispatch_parallel(
            tasks,
            request_id=request_id,
            deadline_monotonic=deadline,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "dispatch",
            "plan": tasks,
            "trace": trace,
            "history": history,
        }

    for item in task_results:
        trace.append(
            {
                "task_id": item["task_id"],
                "worker": item["worker"],
                "status": item["status"],
                "attempts_used": item["attempts_used"],
                "retried": item["retried"],
                "args_hash": item["args_hash"],
                "stop_reason": item.get("stop_reason"),
                "critical": item["critical"],
            }
        )
        history.append(item)

    failed_critical = [
        item
        for item in task_results
        if item["status"] == "failed" and item["critical"]
    ]
    if failed_critical:
        return {
            "status": "stopped",
            "stop_reason": "critical_task_failed",
            "phase": "dispatch",
            "plan": tasks,
            "failed_critical": failed_critical,
            "trace": trace,
            "history": history,
        }

    aggregate = aggregate_results(task_results)

    if time.monotonic() > deadline:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(goal=goal, aggregate=aggregate)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": tasks,
        "aggregate": aggregate,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_orchestrator(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • run_orchestrator(...) керує повним життєвим циклом orchestration run.
  • Паралельний запуск відбувається тільки в gateway, а не всередині LLM-рішень.
  • ALLOWED_WORKERS_POLICY і ALLOWED_WORKERS_EXECUTION можуть відрізнятись (runtime feature-flag/tenant gate).
  • Є чітка політика: якщо впала критична задача, фіналізація не запускається.
  • trace і history дають прозорий аудит: хто запускався, скільки спроб, чому впало.

requirements.txt

TEXT
openai==2.21.0

Приклад виводу

Через симуляцію timeout у payments_worker на першій спробі, типовий trace містить retried=true.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
  "plan": [
    {
      "id": "t1",
      "worker": "sales_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t2",
      "worker": "payments_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    },
    {
      "id": "t3",
      "worker": "inventory_worker",
      "args": {"report_date": "2026-02-26", "region": "US"},
      "critical": true
    }
  ],
  "aggregate": {
    "report_date": "2026-02-26",
    "region": "US",
    "health": "yellow",
    "sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
    "payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
    "inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
  },
  "trace": [
    {
      "task_id": "t1",
      "worker": "sales_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    },
    {
      "task_id": "t2",
      "worker": "payments_worker",
      "status": "done",
      "attempts_used": 2,
      "retried": true
    },
    {
      "task_id": "t3",
      "worker": "inventory_worker",
      "status": "done",
      "attempts_used": 1,
      "retried": false
    }
  ],
  "history": [{...}]
}

Це скорочений приклад: history показано стисло, а формат answer може трохи відрізнятись між запусками.

Якщо встановити INVENTORY_RUNTIME_ENABLED=False, у trace зʼявиться worker_denied:inventory_worker — це наочний приклад різниці між policy і execution allowlist.


Типові stop_reason

  • success — план виконано, фінальна відповідь згенерована
  • invalid_plan:* — план від LLM не пройшов policy validation
  • invalid_plan:missing_keys — у задачі бракує обовʼязкових полів контракту
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — LLM повернув порожню фінальну відповідь
  • max_seconds — перевищено загальний time budget run
  • max_dispatches — перевищено ліміт запусків підзадач (включно з retry)
  • worker_denied:<name> — worker не в execution allowlist
  • worker_missing:<name> — worker відсутній у registry
  • worker_bad_args:<name> — задача містить некоректні аргументи
  • worker_bad_result:<name> — worker повернув дані поза контрактом
  • task_timeout — підзадача не вклалась у task_timeout_seconds
  • critical_task_failed — провалилась хоча б одна критична підзадача

Що тут НЕ показано

  • Немає auth/PII та tenant isolation.
  • Немає черги/персистентного job-runner (RabbitMQ/SQS/Kafka).
  • Немає circuit breaker і складних backoff-політик.
  • Немає бюджетів по токенах/вартості (cost guardrails).

Що спробувати далі

  • Зроби payments_worker некритичним (critical=false) і подивись, як run повертає partial-result.
  • Встанови INVENTORY_RUNTIME_ENABLED=False і перевір worker_denied:inventory_worker.
  • Зменши task_timeout_seconds до 0.2 і перевір, як зростає кількість timeout/retry.
  • Додай четвертого worker-а (fraud_worker) і порівняй час run з max_parallel=2 та max_parallel=4.
⏱️ 15 хв читанняОновлено Бер, 2026Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.
Автор

Цю документацію курують і підтримують інженери, які запускають AI-агентів у продакшені.

Контент створено з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Патерни та рекомендації базуються на постмортемах, режимах відмов і операційних інцидентах у розгорнутих системах, зокрема під час розробки та експлуатації governance-інфраструктури для агентів у OnceOnly.