Pattern Essence (Brief)
Orchestrator Agent is a pattern where the agent does not perform domain work itself, but coordinates multiple specialized executors.
LLM decides which subtasks to launch (what to do), while the policy layer controls how to launch them safely (what is allowed, how long to wait, when to retry, which limits apply).
What this example demonstrates
- separate
Planstage to build a subtask set - parallel subtask
Dispatch(ThreadPoolExecutor) - policy boundary between orchestration decision (LLM) and worker execution
- strict plan validation (
kind,tasks, task shape) - separate allowlists for policy and execution layers
- timeout per subtask +
retry_oncefor transient timeout - partial-results policy: a non-critical task may fail, but the run continues
- explicit
stop_reasonvalues for debugging and production monitoring
Architecture
- LLM receives the goal and returns an orchestration plan in JSON (
kind="plan",tasks). - Policy boundary validates the plan and rejects disallowed workers/arguments.
OrchestratorGatewaydispatches subtasks in parallel with timeout and retry.- Results (successful and failed) are collected into unified
history/trace. - If a critical task does not complete, the run stops.
- If minimally required facts are present, LLM composes a short final operations report.
LLM returns intent (plan), treated as untrusted input: policy boundary validates it first and then (if allowed) launches workers.
Project structure
examples/
βββ agent-patterns/
βββ orchestrator-agent/
βββ python/
βββ main.py # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
βββ llm.py # planner + final synthesis
βββ gateway.py # policy boundary: validation + timeout/retry + budgets
βββ workers.py # deterministic specialists (sales/payments/inventory)
βββ requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows it is easier to use set environment variables or, if desired, python-dotenv to load .env automatically.
Task
Imagine a real production case:
"Prepare a morning operations report for 2026-02-26 for the US market: sales, payment incidents, and inventory risks."
The agent should not do everything in one flow. It should:
- break the task into independent subtasks
- run subtasks in parallel
- retry a timeout task once
- produce one final short brief
Solution
Here the Orchestrator works as a coordinator:
- LLM generates a plan with tasks for specialized workers
- the policy layer checks the plan and blocks disallowed tasks
- gateway runs workers in parallel with timeout/retry
- results are collected into an aggregate model
- the final short text is generated only after facts are collected
This is not ReAct and not Routing:
- not ReAct, because the focus is not a sequential
Think -> Actloop of one agent - not Routing, because multiple executors are needed simultaneously, not one target
Code
workers.py β specialized executors
from __future__ import annotations
import time
from typing import Any
SALES_METRICS = {
"2026-02-26:US": {
"gross_sales_usd": 182_450.0,
"orders": 4_820,
"aov_usd": 37.85,
}
}
PAYMENT_ALERTS = {
"2026-02-26:US": {
"failed_payment_rate": 0.023,
"chargeback_alerts": 3,
"gateway_incident": "none",
}
}
INVENTORY_ALERTS = {
"2026-02-26:US": {
"low_stock_skus": ["SKU-4411", "SKU-8820"],
"out_of_stock_skus": ["SKU-9033"],
"restock_eta_days": 2,
}
}
_ATTEMPT_STATE: dict[str, int] = {}
def _key(report_date: str, region: str) -> str:
return f"{report_date}:{region.upper()}"
def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.4)
metrics = SALES_METRICS.get(_key(report_date, region))
if not metrics:
return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
return {"status": "done", "worker": "sales_worker", "result": metrics}
def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
# Simulate a transient timeout on the first attempt within one request_id.
attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
_ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
attempt = _ATTEMPT_STATE[attempt_key]
if attempt == 1:
time.sleep(2.6) # Exceeds gateway timeout on purpose.
else:
time.sleep(0.3)
alerts = PAYMENT_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
return {"status": "done", "worker": "payments_worker", "result": alerts}
def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.5)
alerts = INVENTORY_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
return {"status": "done", "worker": "inventory_worker", "result": alerts}
What matters most here (plain words)
- Each worker is responsible only for its domain part.
payments_workerintentionally times out on the 1st attempt to demonstrate retry policy.- Workers do not make orchestration decisions: they only return facts.
Contract between layers in this example:
- LLM can only propose a plan (
kind="plan",tasks). - Gateway validates the plan and rejects unsafe/invalid tasks.
- Unknown keys in tasks are ignored during normalization if required fields are present.
- Gateway applies runtime policy: allowlist, timeout, retry, dispatch budget, deadline.
- Workers perform only their domain function and return structured
dict. - Orchestrator decides whether the run can be finalized (for example, after failed critical task -> stop).
gateway.py β policy boundary (most important layer)
from __future__ import annotations
import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_tasks: int = 4
max_parallel: int = 3
max_retries_per_task: int = 1
max_dispatches: int = 8
task_timeout_seconds: float = 2.0
max_seconds: int = 25
def args_hash(args: dict[str, Any]) -> str:
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
def validate_orchestration_plan(
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
if not isinstance(raw_plan, dict):
raise StopRun("invalid_plan:non_json")
if raw_plan.get("kind") != "plan":
raise StopRun("invalid_plan:kind")
tasks = raw_plan.get("tasks")
if not isinstance(tasks, list):
raise StopRun("invalid_plan:tasks")
if not (1 <= len(tasks) <= max_tasks):
raise StopRun("invalid_plan:max_tasks")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
required_keys = {"id", "worker", "args", "critical"}
for task in tasks:
if not isinstance(task, dict):
raise StopRun("invalid_plan:task_shape")
if not required_keys.issubset(task.keys()):
raise StopRun("invalid_plan:missing_keys")
# Ignore unknown keys and keep only contract fields.
task_id = task["id"]
worker = task["worker"]
args = task["args"]
critical = task["critical"]
if not isinstance(task_id, str) or not task_id.strip():
raise StopRun("invalid_plan:task_id")
if task_id in seen_ids:
raise StopRun("invalid_plan:duplicate_task_id")
seen_ids.add(task_id)
if not isinstance(worker, str) or not worker.strip():
raise StopRun("invalid_plan:worker")
if worker not in allowed_workers:
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
if not isinstance(args, dict):
raise StopRun("invalid_plan:args")
if not isinstance(critical, bool):
raise StopRun("invalid_plan:critical")
normalized.append(
{
"id": task_id.strip(),
"worker": worker.strip(),
"args": dict(args),
"critical": critical,
}
)
return normalized
class OrchestratorGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
) -> None:
self.allow = allow
self.registry = registry
self.budget = budget
self.dispatches = 0
self._lock = threading.Lock()
def _consume_dispatch_budget(self) -> None:
with self._lock:
self.dispatches += 1
if self.dispatches > self.budget.max_dispatches:
raise StopRun("max_dispatches")
def _call_once(
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
) -> dict[str, Any]:
if worker_name not in self.allow:
raise StopRun(f"worker_denied:{worker_name}")
fn = self.registry.get(worker_name)
if fn is None:
raise StopRun(f"worker_missing:{worker_name}")
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(fn, **args)
try:
result = future.result(timeout=task_timeout)
except TimeoutError as exc:
raise StopRun("task_timeout") from exc
except TypeError as exc:
raise StopRun(f"worker_bad_args:{worker_name}") from exc
if not isinstance(result, dict):
raise StopRun(f"worker_bad_result:{worker_name}")
return result
def _run_task_with_retry(
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
) -> dict[str, Any]:
task_id = task["id"]
worker_name = task["worker"]
semantic_args = dict(task["args"])
semantic_hash = args_hash(semantic_args)
base_args = dict(semantic_args)
base_args["request_id"] = request_id
attempts_total = self.budget.max_retries_per_task + 1
last_reason = "unknown"
for attempt in range(1, attempts_total + 1):
try:
self._consume_dispatch_budget()
observation = self._call_once(
worker_name, base_args, deadline_monotonic=deadline_monotonic
)
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "done",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"observation": observation,
}
except StopRun as exc:
last_reason = exc.reason
if exc.reason == "task_timeout" and attempt < attempts_total:
continue
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempts_total,
"retried": True,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
*,
request_id: str,
deadline_monotonic: float,
) -> list[dict[str, Any]]:
if not tasks:
return []
indexed_tasks = list(enumerate(tasks))
output: list[tuple[int, dict[str, Any]]] = []
parallelism = min(self.budget.max_parallel, len(tasks))
with ThreadPoolExecutor(max_workers=parallelism) as pool:
future_to_idx = {
pool.submit(
self._run_task_with_retry, task, request_id, deadline_monotonic
): idx
for idx, task in indexed_tasks
}
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
try:
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
idx = future_to_idx[future]
output.append((idx, future.result()))
except TimeoutError as exc:
raise StopRun("max_seconds") from exc
output.sort(key=lambda item: item[0])
return [item[1] for item in output]
What matters most here (plain words)
- Gateway is exactly where execution policy is implemented: allowlist, timeout, retry, dispatch budget.
validate_orchestration_plan(...)checks the required field contract and ignores extra keys.dispatch_parallel(...)preserves parallelism and also controls global deadline (max_seconds) inside gateway.
llm.py β planning and final synthesis
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"tasks": [
{"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
]
}
Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()
WORKER_CATALOG = [
{
"name": "sales_worker",
"description": "Provides sales KPIs for a date and region",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "payments_worker",
"description": "Provides payment failure and chargeback signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "inventory_worker",
"description": "Provides low-stock and out-of-stock risk signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(
*,
goal: str,
report_date: str,
region: str,
max_tasks: int,
) -> dict[str, Any]:
payload = {
"goal": goal,
"report_date": report_date,
"region": region,
"max_tasks": max_tasks,
"available_workers": WORKER_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
What matters most here (plain words)
- LLM returns only plan/answer, but does not launch workers directly.
- Planner receives
max_tasksandavailable_workers, so selection is constrained already at intent stage. - On non-JSON or timeout, explicit signals are returned for controlled stopping.
main.py β Plan -> Dispatch (parallel) -> Aggregate -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker
REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
"Prepare a morning operations report for e-commerce region US on 2026-02-26. "
"Include sales, payment risk, and inventory risk with one concrete action."
)
BUDGET = Budget(
max_tasks=4,
max_parallel=3,
max_retries_per_task=1,
max_dispatches=8,
task_timeout_seconds=2.0,
max_seconds=25,
)
WORKER_REGISTRY = {
"sales_worker": sales_worker,
"payments_worker": payments_worker,
"inventory_worker": inventory_worker,
}
ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
{"sales_worker", "payments_worker", "inventory_worker"}
if INVENTORY_RUNTIME_ENABLED
else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.
def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
done_by_worker: dict[str, dict[str, Any]] = {}
failed: list[dict[str, Any]] = []
for item in task_results:
if item["status"] == "done":
done_by_worker[item["worker"]] = item["observation"]
else:
failed.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"critical": item["critical"],
"stop_reason": item["stop_reason"],
}
)
sales = done_by_worker.get("sales_worker", {}).get("result", {})
payments = done_by_worker.get("payments_worker", {}).get("result", {})
inventory = done_by_worker.get("inventory_worker", {}).get("result", {})
health = "green"
if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
health = "yellow"
if payments.get("gateway_incident") not in (None, "none"):
health = "red"
return {
"report_date": REPORT_DATE,
"region": REGION,
"health": health,
"sales": sales,
"payments": payments,
"inventory": inventory,
"failed_tasks": failed,
}
def run_orchestrator(goal: str) -> dict[str, Any]:
started = time.monotonic()
deadline = started + BUDGET.max_seconds
request_id = uuid.uuid4().hex[:10]
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = OrchestratorGateway(
allow=ALLOWED_WORKERS_EXECUTION,
registry=WORKER_REGISTRY,
budget=BUDGET,
)
try:
raw_plan = create_plan(
goal=goal,
report_date=REPORT_DATE,
region=REGION,
max_tasks=BUDGET.max_tasks,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
tasks = validate_orchestration_plan(
raw_plan,
allowed_workers=ALLOWED_WORKERS_POLICY,
max_tasks=BUDGET.max_tasks,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "plan",
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
try:
task_results = gateway.dispatch_parallel(
tasks,
request_id=request_id,
deadline_monotonic=deadline,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
for item in task_results:
trace.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"status": item["status"],
"attempts_used": item["attempts_used"],
"retried": item["retried"],
"args_hash": item["args_hash"],
"stop_reason": item.get("stop_reason"),
"critical": item["critical"],
}
)
history.append(item)
failed_critical = [
item
for item in task_results
if item["status"] == "failed" and item["critical"]
]
if failed_critical:
return {
"status": "stopped",
"stop_reason": "critical_task_failed",
"phase": "dispatch",
"plan": tasks,
"failed_critical": failed_critical,
"trace": trace,
"history": history,
}
aggregate = aggregate_results(task_results)
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(goal=goal, aggregate=aggregate)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": tasks,
"aggregate": aggregate,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_orchestrator(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (plain words)
run_orchestrator(...)manages the full lifecycle of orchestration run.- Parallel execution happens only in gateway, not inside LLM decisions.
ALLOWED_WORKERS_POLICYandALLOWED_WORKERS_EXECUTIONmay differ (runtime feature flag/tenant gate).- There is a clear policy: if a critical task fails, finalization does not run.
traceandhistoryprovide transparent audit: what ran, how many attempts, why it failed.
requirements.txt
openai==2.21.0
Output example
Because of the simulated timeout in payments_worker on the first attempt, a typical trace contains retried=true.
{
"status": "ok",
"stop_reason": "success",
"answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
"plan": [
{
"id": "t1",
"worker": "sales_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t2",
"worker": "payments_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t3",
"worker": "inventory_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
}
],
"aggregate": {
"report_date": "2026-02-26",
"region": "US",
"health": "yellow",
"sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
"payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
"inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
},
"trace": [
{
"task_id": "t1",
"worker": "sales_worker",
"status": "done",
"attempts_used": 1,
"retried": false
},
{
"task_id": "t2",
"worker": "payments_worker",
"status": "done",
"attempts_used": 2,
"retried": true
},
{
"task_id": "t3",
"worker": "inventory_worker",
"status": "done",
"attempts_used": 1,
"retried": false
}
],
"history": [{...}]
}
This is a shortened example: history is shown compactly, and answer format may vary slightly across runs.
If you set INVENTORY_RUNTIME_ENABLED=False, worker_denied:inventory_worker appears in trace - a clear example of the difference between policy and execution allowlists.
Common stop_reason
success- plan executed, final answer generatedinvalid_plan:*- LLM plan failed policy validationinvalid_plan:missing_keys- a task is missing required contract fieldsllm_timeout- LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_empty- LLM returned an empty final answermax_seconds- total run time budget exceededmax_dispatches- subtask dispatch limit exceeded (including retries)worker_denied:<name>- worker is not in execution allowlistworker_missing:<name>- worker is missing in registryworker_bad_args:<name>- task contains invalid argumentsworker_bad_result:<name>- worker returned data outside contracttask_timeout- subtask did not finish withintask_timeout_secondscritical_task_failed- at least one critical subtask failed
What is NOT shown here
- No auth/PII and tenant isolation.
- No queue/persistent job runner (RabbitMQ/SQS/Kafka).
- No circuit breaker or advanced backoff policies.
- No token/cost budgets (cost guardrails).
What to try next
- Make
payments_workernon-critical (critical=false) and see how run returns partial result. - Set
INVENTORY_RUNTIME_ENABLED=Falseand checkworker_denied:inventory_worker. - Reduce
task_timeout_secondsto0.2and observe timeout/retry count growth. - Add a fourth worker (
fraud_worker) and compare run time withmax_parallel=2andmax_parallel=4.