Суть патерна (коротко)
Orchestrator Agent — це патерн, у якому агент сам не виконує доменну роботу, а координує кілька спеціалізованих виконавців.
LLM вирішує, які підзадачі запускати (що робити), а policy-шар контролює як їх запускати безпечно (що дозволено, скільки чекати, коли повторювати, які ліміти).
Що демонструє цей приклад
- окремий етап
Planдля побудови набору підзадач Dispatchпідзадач у паралелі (ThreadPoolExecutor)- policy boundary між orchestration decision (LLM) і worker execution
- strict validation плану (
kind,tasks, shape задачі) - окремі allowlist-и для policy і execution шарів
- timeout на кожну підзадачу +
retry_onceдля transient timeout - partial-results policy: non-critical задача може впасти, але run продовжується
- явні
stop_reasonдля дебагу і продакшен-моніторингу
Архітектура
- LLM отримує goal і повертає orchestration-план у JSON (
kind="plan",tasks). - Policy boundary валідовує план і відкидає недопустимі workers/аргументи.
OrchestratorGatewayпаралельно запускає підзадачі з timeout і retry.- Результати (успішні та failed) збираються в єдиний
history/trace. - Якщо критична задача не виконалась, run зупиняється.
- Якщо мінімально потрібні факти є, LLM формує короткий фінальний операційний звіт.
LLM повертає intent (план), який розглядається як недовірений input: policy boundary спершу валідовує його, а потім (якщо дозволено) запускає workers.
Структура проєкту
examples/
└── agent-patterns/
└── orchestrator-agent/
└── python/
├── main.py # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: validation + timeout/retry + budgets
├── workers.py # deterministic specialists (sales/payments/inventory)
└── requirements.txt
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.
Задача
Уяви реальний продакшен-кейс:
"Підготуй ранковий операційний звіт за 2026-02-26 для US ринку: продажі, платіжні інциденти та ризики запасів."
Агент не має робити все в одному потоці. Він має:
- розбити задачу на незалежні підзадачі
- запустити підзадачі паралельно
- перезапустити timeout-задачу один раз
- зібрати єдиний фінальний короткий бриф
Рішення
Тут Orchestrator працює як координатор:
- LLM генерує план із задачами для спеціалізованих workers
- policy шар перевіряє план і блокує недозволені задачі
- gateway запускає workers у паралелі з timeout/retry
- результати збираються в aggregate-модель
- фінальний короткий текст генерується лише після збору фактів
Це не ReAct і не Routing:
- не ReAct, бо акцент не на послідовному
Think -> Actциклі одного агента - не Routing, бо тут треба кілька виконавців одночасно, а не один target
Код
workers.py — спеціалізовані виконавці
from __future__ import annotations
import time
from typing import Any
SALES_METRICS = {
"2026-02-26:US": {
"gross_sales_usd": 182_450.0,
"orders": 4_820,
"aov_usd": 37.85,
}
}
PAYMENT_ALERTS = {
"2026-02-26:US": {
"failed_payment_rate": 0.023,
"chargeback_alerts": 3,
"gateway_incident": "none",
}
}
INVENTORY_ALERTS = {
"2026-02-26:US": {
"low_stock_skus": ["SKU-4411", "SKU-8820"],
"out_of_stock_skus": ["SKU-9033"],
"restock_eta_days": 2,
}
}
_ATTEMPT_STATE: dict[str, int] = {}
def _key(report_date: str, region: str) -> str:
return f"{report_date}:{region.upper()}"
def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.4)
metrics = SALES_METRICS.get(_key(report_date, region))
if not metrics:
return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
return {"status": "done", "worker": "sales_worker", "result": metrics}
def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
# Simulate a transient timeout on the first attempt within one request_id.
attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
_ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
attempt = _ATTEMPT_STATE[attempt_key]
if attempt == 1:
time.sleep(2.6) # Exceeds gateway timeout on purpose.
else:
time.sleep(0.3)
alerts = PAYMENT_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
return {"status": "done", "worker": "payments_worker", "result": alerts}
def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.5)
alerts = INVENTORY_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
return {"status": "done", "worker": "inventory_worker", "result": alerts}
Що тут найважливіше (простими словами)
- Кожен worker відповідає лише за свою доменну частину.
payments_workerспеціально дає timeout на 1-й спробі, щоб показати retry-policy.- Workers не приймають orchestration-рішень: тільки повертають факти.
Contract між шарами в цьому прикладі:
- LLM може лише запропонувати план (
kind="plan",tasks). - Gateway валідовує план і відкидає небезпечні/некоректні задачі.
- Unknown keys у задачах ігноруються під час нормалізації, якщо обов'язкові поля присутні.
- Gateway застосовує runtime-policy: allowlist, timeout, retry, dispatch budget, deadline.
- Workers виконують лише свою доменну функцію і повертають структурований
dict. - Orchestrator вирішує, чи можна фіналізувати run (наприклад, після failed critical task — стоп).
gateway.py — policy boundary (найважливіший шар)
from __future__ import annotations
import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_tasks: int = 4
max_parallel: int = 3
max_retries_per_task: int = 1
max_dispatches: int = 8
task_timeout_seconds: float = 2.0
max_seconds: int = 25
def args_hash(args: dict[str, Any]) -> str:
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
def validate_orchestration_plan(
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
if not isinstance(raw_plan, dict):
raise StopRun("invalid_plan:non_json")
if raw_plan.get("kind") != "plan":
raise StopRun("invalid_plan:kind")
tasks = raw_plan.get("tasks")
if not isinstance(tasks, list):
raise StopRun("invalid_plan:tasks")
if not (1 <= len(tasks) <= max_tasks):
raise StopRun("invalid_plan:max_tasks")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
required_keys = {"id", "worker", "args", "critical"}
for task in tasks:
if not isinstance(task, dict):
raise StopRun("invalid_plan:task_shape")
if not required_keys.issubset(task.keys()):
raise StopRun("invalid_plan:missing_keys")
# Ignore unknown keys and keep only contract fields.
task_id = task["id"]
worker = task["worker"]
args = task["args"]
critical = task["critical"]
if not isinstance(task_id, str) or not task_id.strip():
raise StopRun("invalid_plan:task_id")
if task_id in seen_ids:
raise StopRun("invalid_plan:duplicate_task_id")
seen_ids.add(task_id)
if not isinstance(worker, str) or not worker.strip():
raise StopRun("invalid_plan:worker")
if worker not in allowed_workers:
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
if not isinstance(args, dict):
raise StopRun("invalid_plan:args")
if not isinstance(critical, bool):
raise StopRun("invalid_plan:critical")
normalized.append(
{
"id": task_id.strip(),
"worker": worker.strip(),
"args": dict(args),
"critical": critical,
}
)
return normalized
class OrchestratorGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
) -> None:
self.allow = allow
self.registry = registry
self.budget = budget
self.dispatches = 0
self._lock = threading.Lock()
def _consume_dispatch_budget(self) -> None:
with self._lock:
self.dispatches += 1
if self.dispatches > self.budget.max_dispatches:
raise StopRun("max_dispatches")
def _call_once(
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
) -> dict[str, Any]:
if worker_name not in self.allow:
raise StopRun(f"worker_denied:{worker_name}")
fn = self.registry.get(worker_name)
if fn is None:
raise StopRun(f"worker_missing:{worker_name}")
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(fn, **args)
try:
result = future.result(timeout=task_timeout)
except TimeoutError as exc:
raise StopRun("task_timeout") from exc
except TypeError as exc:
raise StopRun(f"worker_bad_args:{worker_name}") from exc
if not isinstance(result, dict):
raise StopRun(f"worker_bad_result:{worker_name}")
return result
def _run_task_with_retry(
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
) -> dict[str, Any]:
task_id = task["id"]
worker_name = task["worker"]
semantic_args = dict(task["args"])
semantic_hash = args_hash(semantic_args)
base_args = dict(semantic_args)
base_args["request_id"] = request_id
attempts_total = self.budget.max_retries_per_task + 1
last_reason = "unknown"
for attempt in range(1, attempts_total + 1):
try:
self._consume_dispatch_budget()
observation = self._call_once(
worker_name, base_args, deadline_monotonic=deadline_monotonic
)
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "done",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"observation": observation,
}
except StopRun as exc:
last_reason = exc.reason
if exc.reason == "task_timeout" and attempt < attempts_total:
continue
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempts_total,
"retried": True,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
*,
request_id: str,
deadline_monotonic: float,
) -> list[dict[str, Any]]:
if not tasks:
return []
indexed_tasks = list(enumerate(tasks))
output: list[tuple[int, dict[str, Any]]] = []
parallelism = min(self.budget.max_parallel, len(tasks))
with ThreadPoolExecutor(max_workers=parallelism) as pool:
future_to_idx = {
pool.submit(
self._run_task_with_retry, task, request_id, deadline_monotonic
): idx
for idx, task in indexed_tasks
}
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
try:
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
idx = future_to_idx[future]
output.append((idx, future.result()))
except TimeoutError as exc:
raise StopRun("max_seconds") from exc
output.sort(key=lambda item: item[0])
return [item[1] for item in output]
Що тут найважливіше (простими словами)
- Саме gateway реалізує execution-policy: allowlist, timeout, retry, dispatch-budget.
validate_orchestration_plan(...)перевіряє обов'язковий контракт полів і ігнорує зайві ключі.dispatch_parallel(...)зберігає паралельність, але ще й контролює global deadline (max_seconds) всередині gateway.
llm.py — планування і фінальна синтеза
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"tasks": [
{"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
]
}
Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()
WORKER_CATALOG = [
{
"name": "sales_worker",
"description": "Provides sales KPIs for a date and region",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "payments_worker",
"description": "Provides payment failure and chargeback signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "inventory_worker",
"description": "Provides low-stock and out-of-stock risk signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(
*,
goal: str,
report_date: str,
region: str,
max_tasks: int,
) -> dict[str, Any]:
payload = {
"goal": goal,
"report_date": report_date,
"region": region,
"max_tasks": max_tasks,
"available_workers": WORKER_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Що тут найважливіше (простими словами)
- LLM повертає тільки plan/answer, але не запускає workers напряму.
- Planner отримує
max_tasksіavailable_workers, тому вибір обмежений ще на етапі intent. - При non-JSON або timeout повертаються явні сигнали для контрольованої зупинки.
main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker
REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
"Prepare a morning operations report for e-commerce region US on 2026-02-26. "
"Include sales, payment risk, and inventory risk with one concrete action."
)
BUDGET = Budget(
max_tasks=4,
max_parallel=3,
max_retries_per_task=1,
max_dispatches=8,
task_timeout_seconds=2.0,
max_seconds=25,
)
WORKER_REGISTRY = {
"sales_worker": sales_worker,
"payments_worker": payments_worker,
"inventory_worker": inventory_worker,
}
ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
{"sales_worker", "payments_worker", "inventory_worker"}
if INVENTORY_RUNTIME_ENABLED
else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.
def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
done_by_worker: dict[str, dict[str, Any]] = {}
failed: list[dict[str, Any]] = []
for item in task_results:
if item["status"] == "done":
done_by_worker[item["worker"]] = item["observation"]
else:
failed.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"critical": item["critical"],
"stop_reason": item["stop_reason"],
}
)
sales = done_by_worker.get("sales_worker", {}).get("result", {})
payments = done_by_worker.get("payments_worker", {}).get("result", {})
inventory = done_by_worker.get("inventory_worker", {}).get("result", {})
health = "green"
if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
health = "yellow"
if payments.get("gateway_incident") not in (None, "none"):
health = "red"
return {
"report_date": REPORT_DATE,
"region": REGION,
"health": health,
"sales": sales,
"payments": payments,
"inventory": inventory,
"failed_tasks": failed,
}
def run_orchestrator(goal: str) -> dict[str, Any]:
started = time.monotonic()
deadline = started + BUDGET.max_seconds
request_id = uuid.uuid4().hex[:10]
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = OrchestratorGateway(
allow=ALLOWED_WORKERS_EXECUTION,
registry=WORKER_REGISTRY,
budget=BUDGET,
)
try:
raw_plan = create_plan(
goal=goal,
report_date=REPORT_DATE,
region=REGION,
max_tasks=BUDGET.max_tasks,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
tasks = validate_orchestration_plan(
raw_plan,
allowed_workers=ALLOWED_WORKERS_POLICY,
max_tasks=BUDGET.max_tasks,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "plan",
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
try:
task_results = gateway.dispatch_parallel(
tasks,
request_id=request_id,
deadline_monotonic=deadline,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
for item in task_results:
trace.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"status": item["status"],
"attempts_used": item["attempts_used"],
"retried": item["retried"],
"args_hash": item["args_hash"],
"stop_reason": item.get("stop_reason"),
"critical": item["critical"],
}
)
history.append(item)
failed_critical = [
item
for item in task_results
if item["status"] == "failed" and item["critical"]
]
if failed_critical:
return {
"status": "stopped",
"stop_reason": "critical_task_failed",
"phase": "dispatch",
"plan": tasks,
"failed_critical": failed_critical,
"trace": trace,
"history": history,
}
aggregate = aggregate_results(task_results)
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(goal=goal, aggregate=aggregate)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": tasks,
"aggregate": aggregate,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_orchestrator(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
run_orchestrator(...)керує повним життєвим циклом orchestration run.- Паралельний запуск відбувається тільки в gateway, а не всередині LLM-рішень.
ALLOWED_WORKERS_POLICYіALLOWED_WORKERS_EXECUTIONможуть відрізнятись (runtime feature-flag/tenant gate).- Є чітка політика: якщо впала критична задача, фіналізація не запускається.
traceіhistoryдають прозорий аудит: хто запускався, скільки спроб, чому впало.
requirements.txt
openai==2.21.0
Приклад виводу
Через симуляцію timeout у payments_worker на першій спробі, типовий trace містить retried=true.
{
"status": "ok",
"stop_reason": "success",
"answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
"plan": [
{
"id": "t1",
"worker": "sales_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t2",
"worker": "payments_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t3",
"worker": "inventory_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
}
],
"aggregate": {
"report_date": "2026-02-26",
"region": "US",
"health": "yellow",
"sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
"payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
"inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
},
"trace": [
{
"task_id": "t1",
"worker": "sales_worker",
"status": "done",
"attempts_used": 1,
"retried": false
},
{
"task_id": "t2",
"worker": "payments_worker",
"status": "done",
"attempts_used": 2,
"retried": true
},
{
"task_id": "t3",
"worker": "inventory_worker",
"status": "done",
"attempts_used": 1,
"retried": false
}
],
"history": [{...}]
}
Це скорочений приклад: history показано стисло, а формат answer може трохи відрізнятись між запусками.
Якщо встановити INVENTORY_RUNTIME_ENABLED=False, у trace зʼявиться worker_denied:inventory_worker — це наочний приклад різниці між policy і execution allowlist.
Типові stop_reason
success— план виконано, фінальна відповідь згенерованаinvalid_plan:*— план від LLM не пройшов policy validationinvalid_plan:missing_keys— у задачі бракує обовʼязкових полів контрактуllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_empty— LLM повернув порожню фінальну відповідьmax_seconds— перевищено загальний time budget runmax_dispatches— перевищено ліміт запусків підзадач (включно з retry)worker_denied:<name>— worker не в execution allowlistworker_missing:<name>— worker відсутній у registryworker_bad_args:<name>— задача містить некоректні аргументиworker_bad_result:<name>— worker повернув дані поза контрактомtask_timeout— підзадача не вклалась уtask_timeout_secondscritical_task_failed— провалилась хоча б одна критична підзадача
Що тут НЕ показано
- Немає auth/PII та tenant isolation.
- Немає черги/персистентного job-runner (RabbitMQ/SQS/Kafka).
- Немає circuit breaker і складних backoff-політик.
- Немає бюджетів по токенах/вартості (cost guardrails).
Що спробувати далі
- Зроби
payments_workerнекритичним (critical=false) і подивись, як run повертає partial-result. - Встанови
INVENTORY_RUNTIME_ENABLED=Falseі перевірworker_denied:inventory_worker. - Зменши
task_timeout_secondsдо0.2і перевір, як зростає кількість timeout/retry. - Додай четвертого worker-а (
fraud_worker) і порівняй час run зmax_parallel=2таmax_parallel=4.