Essence du pattern (bref)
Orchestrator Agent est un pattern dans lequel l’agent n’exécute pas lui-même le travail métier, mais coordonne plusieurs exécuteurs spécialisés.
Le LLM décide quelles sous-tâches lancer (quoi faire), tandis que la couche policy contrôle comment les lancer en sécurité (ce qui est autorisé, combien attendre, quand retry, quelles limites).
Ce que cet exemple démontre
- étape
Planséparée pour construire l’ensemble de sous-tâches Dispatchparallèle des sous-tâches (ThreadPoolExecutor)- policy boundary entre orchestration decision (LLM) et worker execution
- validation stricte du plan (
kind,tasks, shape de tâche) - allowlists séparées pour les couches policy et execution
- timeout par sous-tâche +
retry_oncepour transient timeout - politique de résultats partiels : une tâche non critique peut échouer, mais le run continue
- valeurs
stop_reasonexplicites pour le débogage et le monitoring production
Architecture
- Le LLM reçoit le goal et renvoie un plan d’orchestration en JSON (
kind="plan",tasks). - La policy boundary valide le plan et rejette les workers/arguments non autorisés.
OrchestratorGatewaylance les sous-tâches en parallèle avec timeout et retry.- Les résultats (successful et failed) sont réunis dans un
history/traceunique. - Si une tâche critique ne se termine pas, le run s’arrête.
- Si les faits minimaux requis sont présents, le LLM compose un court rapport opérationnel final.
Le LLM renvoie un intent (plan), traité comme un input non fiable : la policy boundary le valide d’abord puis (si autorisé) lance les workers.
Structure du projet
examples/
└── agent-patterns/
└── orchestrator-agent/
└── python/
├── main.py # Plan -> Dispatch (parallel) -> Aggregate -> Finalize
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: validation + timeout/retry + budgets
├── workers.py # deterministic specialists (sales/payments/inventory)
└── requirements.txt
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/orchestrator-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
C’est la variante shell (macOS/Linux). Sous Windows, il est plus simple d’utiliser des variables d’environnement set ou, si souhaité, python-dotenv pour charger .env automatiquement.
Tâche
Imagine un cas de production réel :
"Prépare un rapport opérationnel du matin pour le marché US au 2026-02-26 : ventes, incidents de paiement et risques de stock."
L’agent ne doit pas tout faire dans un seul flux. Il doit :
- découper la tâche en sous-tâches indépendantes
- lancer les sous-tâches en parallèle
- relancer une fois une tâche en timeout
- produire un brief final court unique
Solution
Ici, l’Orchestrator fonctionne comme coordinateur :
- le LLM génère un plan avec des tâches pour des workers spécialisés
- la couche policy vérifie le plan et bloque les tâches non autorisées
- le gateway exécute les workers en parallèle avec timeout/retry
- les résultats sont rassemblés dans un modèle d’agrégation
- le texte final court est généré uniquement après collecte des faits
Ce n’est ni ReAct ni Routing :
- pas ReAct, car l’accent n’est pas sur une boucle séquentielle
Think -> Actd’un seul agent - pas Routing, car ici il faut plusieurs exécuteurs en même temps, pas une seule target
Code
workers.py — exécuteurs spécialisés
from __future__ import annotations
import time
from typing import Any
SALES_METRICS = {
"2026-02-26:US": {
"gross_sales_usd": 182_450.0,
"orders": 4_820,
"aov_usd": 37.85,
}
}
PAYMENT_ALERTS = {
"2026-02-26:US": {
"failed_payment_rate": 0.023,
"chargeback_alerts": 3,
"gateway_incident": "none",
}
}
INVENTORY_ALERTS = {
"2026-02-26:US": {
"low_stock_skus": ["SKU-4411", "SKU-8820"],
"out_of_stock_skus": ["SKU-9033"],
"restock_eta_days": 2,
}
}
_ATTEMPT_STATE: dict[str, int] = {}
def _key(report_date: str, region: str) -> str:
return f"{report_date}:{region.upper()}"
def sales_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.4)
metrics = SALES_METRICS.get(_key(report_date, region))
if not metrics:
return {"status": "done", "worker": "sales_worker", "result": {"warning": "sales_data_missing"}}
return {"status": "done", "worker": "sales_worker", "result": metrics}
def payments_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
# Simulate a transient timeout on the first attempt within one request_id.
attempt_key = f"{request_id}:{_key(report_date, region)}:payments"
_ATTEMPT_STATE[attempt_key] = _ATTEMPT_STATE.get(attempt_key, 0) + 1
attempt = _ATTEMPT_STATE[attempt_key]
if attempt == 1:
time.sleep(2.6) # Exceeds gateway timeout on purpose.
else:
time.sleep(0.3)
alerts = PAYMENT_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "payments_worker", "result": {"warning": "payments_data_missing"}}
return {"status": "done", "worker": "payments_worker", "result": alerts}
def inventory_worker(report_date: str, region: str, request_id: str) -> dict[str, Any]:
time.sleep(0.5)
alerts = INVENTORY_ALERTS.get(_key(report_date, region))
if not alerts:
return {"status": "done", "worker": "inventory_worker", "result": {"warning": "inventory_data_missing"}}
return {"status": "done", "worker": "inventory_worker", "result": alerts}
Ce qui est le plus important ici (en mots simples)
- Chaque worker ne couvre que sa partie métier.
payments_workerproduit volontairement un timeout à la 1re tentative pour montrer la retry-policy.- Les workers ne prennent pas de décisions d’orchestration : ils renvoient seulement des faits.
Contract entre couches dans cet exemple :
- Le LLM peut seulement proposer un plan (
kind="plan",tasks). - Le Gateway valide le plan et rejette les tâches dangereuses/non valides.
- Les unknown keys dans les tâches sont ignorées pendant la normalisation si les champs obligatoires sont présents.
- Le Gateway applique la runtime-policy : allowlist, timeout, retry, dispatch budget, deadline.
- Les workers exécutent uniquement leur fonction métier et renvoient un
dictstructuré. - L’Orchestrator décide si le run peut être finalisé (par exemple, après failed critical task -> stop).
gateway.py — policy boundary (couche la plus importante)
from __future__ import annotations
import hashlib
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_tasks: int = 4
max_parallel: int = 3
max_retries_per_task: int = 1
max_dispatches: int = 8
task_timeout_seconds: float = 2.0
max_seconds: int = 25
def args_hash(args: dict[str, Any]) -> str:
stable = json.dumps(args, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(stable.encode("utf-8")).hexdigest()[:12]
def validate_orchestration_plan(
raw_plan: dict[str, Any], *, allowed_workers: set[str], max_tasks: int
) -> list[dict[str, Any]]:
if not isinstance(raw_plan, dict):
raise StopRun("invalid_plan:non_json")
if raw_plan.get("kind") != "plan":
raise StopRun("invalid_plan:kind")
tasks = raw_plan.get("tasks")
if not isinstance(tasks, list):
raise StopRun("invalid_plan:tasks")
if not (1 <= len(tasks) <= max_tasks):
raise StopRun("invalid_plan:max_tasks")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
required_keys = {"id", "worker", "args", "critical"}
for task in tasks:
if not isinstance(task, dict):
raise StopRun("invalid_plan:task_shape")
if not required_keys.issubset(task.keys()):
raise StopRun("invalid_plan:missing_keys")
# Ignore unknown keys and keep only contract fields.
task_id = task["id"]
worker = task["worker"]
args = task["args"]
critical = task["critical"]
if not isinstance(task_id, str) or not task_id.strip():
raise StopRun("invalid_plan:task_id")
if task_id in seen_ids:
raise StopRun("invalid_plan:duplicate_task_id")
seen_ids.add(task_id)
if not isinstance(worker, str) or not worker.strip():
raise StopRun("invalid_plan:worker")
if worker not in allowed_workers:
raise StopRun(f"invalid_plan:worker_not_allowed:{worker}")
if not isinstance(args, dict):
raise StopRun("invalid_plan:args")
if not isinstance(critical, bool):
raise StopRun("invalid_plan:critical")
normalized.append(
{
"id": task_id.strip(),
"worker": worker.strip(),
"args": dict(args),
"critical": critical,
}
)
return normalized
class OrchestratorGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
) -> None:
self.allow = allow
self.registry = registry
self.budget = budget
self.dispatches = 0
self._lock = threading.Lock()
def _consume_dispatch_budget(self) -> None:
with self._lock:
self.dispatches += 1
if self.dispatches > self.budget.max_dispatches:
raise StopRun("max_dispatches")
def _call_once(
self, worker_name: str, args: dict[str, Any], *, deadline_monotonic: float
) -> dict[str, Any]:
if worker_name not in self.allow:
raise StopRun(f"worker_denied:{worker_name}")
fn = self.registry.get(worker_name)
if fn is None:
raise StopRun(f"worker_missing:{worker_name}")
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
task_timeout = min(self.budget.task_timeout_seconds, max(0.01, remaining))
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(fn, **args)
try:
result = future.result(timeout=task_timeout)
except TimeoutError as exc:
raise StopRun("task_timeout") from exc
except TypeError as exc:
raise StopRun(f"worker_bad_args:{worker_name}") from exc
if not isinstance(result, dict):
raise StopRun(f"worker_bad_result:{worker_name}")
return result
def _run_task_with_retry(
self, task: dict[str, Any], request_id: str, deadline_monotonic: float
) -> dict[str, Any]:
task_id = task["id"]
worker_name = task["worker"]
semantic_args = dict(task["args"])
semantic_hash = args_hash(semantic_args)
base_args = dict(semantic_args)
base_args["request_id"] = request_id
attempts_total = self.budget.max_retries_per_task + 1
last_reason = "unknown"
for attempt in range(1, attempts_total + 1):
try:
self._consume_dispatch_budget()
observation = self._call_once(
worker_name, base_args, deadline_monotonic=deadline_monotonic
)
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "done",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"observation": observation,
}
except StopRun as exc:
last_reason = exc.reason
if exc.reason == "task_timeout" and attempt < attempts_total:
continue
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempt,
"retried": attempt > 1,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
return {
"task_id": task_id,
"worker": worker_name,
"critical": task["critical"],
"status": "failed",
"attempts_used": attempts_total,
"retried": True,
"args_hash": semantic_hash,
"stop_reason": last_reason,
}
def dispatch_parallel(
self,
tasks: list[dict[str, Any]],
*,
request_id: str,
deadline_monotonic: float,
) -> list[dict[str, Any]]:
if not tasks:
return []
indexed_tasks = list(enumerate(tasks))
output: list[tuple[int, dict[str, Any]]] = []
parallelism = min(self.budget.max_parallel, len(tasks))
with ThreadPoolExecutor(max_workers=parallelism) as pool:
future_to_idx = {
pool.submit(
self._run_task_with_retry, task, request_id, deadline_monotonic
): idx
for idx, task in indexed_tasks
}
remaining = deadline_monotonic - time.monotonic()
if remaining <= 0:
raise StopRun("max_seconds")
try:
for future in as_completed(future_to_idx, timeout=max(0.01, remaining)):
idx = future_to_idx[future]
output.append((idx, future.result()))
except TimeoutError as exc:
raise StopRun("max_seconds") from exc
output.sort(key=lambda item: item[0])
return [item[1] for item in output]
Ce qui est le plus important ici (en mots simples)
- C’est le gateway qui implémente l’execution-policy : allowlist, timeout, retry, dispatch budget.
validate_orchestration_plan(...)vérifie le contrat obligatoire des champs et ignore les clés en trop.dispatch_parallel(...)conserve le parallélisme et contrôle aussi la deadline globale (max_seconds) à l’intérieur du gateway.
llm.py — planification et synthèse finale
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are an orchestration planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"tasks": [
{"id":"t1","worker":"sales_worker","args":{"report_date":"YYYY-MM-DD","region":"US"},"critical":true}
]
}
Rules:
- Create 2 to 4 independent tasks.
- Use only workers from available_workers.
- Keep args minimal and valid for the selected worker.
- Mark task as critical=true only if final brief is impossible without it.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are an operations assistant.
Write a short final briefing in English for an e-commerce operations manager.
Include:
- headline health status (green/yellow/red)
- key sales metrics
- payment risk note
- inventory risk note
- one concrete next action
Use only evidence from orchestrator output.
""".strip()
WORKER_CATALOG = [
{
"name": "sales_worker",
"description": "Provides sales KPIs for a date and region",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "payments_worker",
"description": "Provides payment failure and chargeback signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
{
"name": "inventory_worker",
"description": "Provides low-stock and out-of-stock risk signals",
"args": {"report_date": "YYYY-MM-DD", "region": "US|EU|..."},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(
*,
goal: str,
report_date: str,
region: str,
max_tasks: int,
) -> dict[str, Any]:
payload = {
"goal": goal,
"report_date": report_date,
"region": region,
"max_tasks": max_tasks,
"available_workers": WORKER_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(*, goal: str, aggregate: dict[str, Any]) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Ce qui est le plus important ici (en mots simples)
- Le LLM renvoie seulement plan/answer, mais ne lance pas les workers directement.
- Le planner reçoit
max_tasksetavailable_workers, donc le choix est déjà contraint au stade intent. - En cas de non-JSON ou timeout, des signaux explicites sont renvoyés pour un arrêt contrôlé.
main.py — Plan -> Dispatch (parallel) -> Aggregate -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from gateway import Budget, OrchestratorGateway, StopRun, validate_orchestration_plan
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from workers import inventory_worker, payments_worker, sales_worker
REPORT_DATE = "2026-02-26"
REGION = "US"
GOAL = (
"Prepare a morning operations report for e-commerce region US on 2026-02-26. "
"Include sales, payment risk, and inventory risk with one concrete action."
)
BUDGET = Budget(
max_tasks=4,
max_parallel=3,
max_retries_per_task=1,
max_dispatches=8,
task_timeout_seconds=2.0,
max_seconds=25,
)
WORKER_REGISTRY = {
"sales_worker": sales_worker,
"payments_worker": payments_worker,
"inventory_worker": inventory_worker,
}
ALLOWED_WORKERS_POLICY = {"sales_worker", "payments_worker", "inventory_worker"}
INVENTORY_RUNTIME_ENABLED = True
ALLOWED_WORKERS_EXECUTION = (
{"sales_worker", "payments_worker", "inventory_worker"}
if INVENTORY_RUNTIME_ENABLED
else {"sales_worker", "payments_worker"}
)
# Set INVENTORY_RUNTIME_ENABLED=False to get worker_denied:inventory_worker in trace.
def aggregate_results(task_results: list[dict[str, Any]]) -> dict[str, Any]:
done_by_worker: dict[str, dict[str, Any]] = {}
failed: list[dict[str, Any]] = []
for item in task_results:
if item["status"] == "done":
done_by_worker[item["worker"]] = item["observation"]
else:
failed.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"critical": item["critical"],
"stop_reason": item["stop_reason"],
}
)
sales = done_by_worker.get("sales_worker", {}).get("result", {})
payments = done_by_worker.get("payments_worker", {}).get("result", {})
inventory = done_by_worker.get("inventory_worker", {}).get("result", {})
health = "green"
if payments.get("failed_payment_rate", 0) >= 0.03 or inventory.get("out_of_stock_skus"):
health = "yellow"
if payments.get("gateway_incident") not in (None, "none"):
health = "red"
return {
"report_date": REPORT_DATE,
"region": REGION,
"health": health,
"sales": sales,
"payments": payments,
"inventory": inventory,
"failed_tasks": failed,
}
def run_orchestrator(goal: str) -> dict[str, Any]:
started = time.monotonic()
deadline = started + BUDGET.max_seconds
request_id = uuid.uuid4().hex[:10]
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = OrchestratorGateway(
allow=ALLOWED_WORKERS_EXECUTION,
registry=WORKER_REGISTRY,
budget=BUDGET,
)
try:
raw_plan = create_plan(
goal=goal,
report_date=REPORT_DATE,
region=REGION,
max_tasks=BUDGET.max_tasks,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
tasks = validate_orchestration_plan(
raw_plan,
allowed_workers=ALLOWED_WORKERS_POLICY,
max_tasks=BUDGET.max_tasks,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "plan",
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
try:
task_results = gateway.dispatch_parallel(
tasks,
request_id=request_id,
deadline_monotonic=deadline,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "dispatch",
"plan": tasks,
"trace": trace,
"history": history,
}
for item in task_results:
trace.append(
{
"task_id": item["task_id"],
"worker": item["worker"],
"status": item["status"],
"attempts_used": item["attempts_used"],
"retried": item["retried"],
"args_hash": item["args_hash"],
"stop_reason": item.get("stop_reason"),
"critical": item["critical"],
}
)
history.append(item)
failed_critical = [
item
for item in task_results
if item["status"] == "failed" and item["critical"]
]
if failed_critical:
return {
"status": "stopped",
"stop_reason": "critical_task_failed",
"phase": "dispatch",
"plan": tasks,
"failed_critical": failed_critical,
"trace": trace,
"history": history,
}
aggregate = aggregate_results(task_results)
if time.monotonic() > deadline:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(goal=goal, aggregate=aggregate)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"aggregate": aggregate,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": tasks,
"aggregate": aggregate,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_orchestrator(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui est le plus important ici (en mots simples)
run_orchestrator(...)gère le cycle de vie complet d’un run d’orchestration.- L’exécution parallèle a lieu uniquement dans le gateway, pas dans les décisions LLM.
ALLOWED_WORKERS_POLICYetALLOWED_WORKERS_EXECUTIONpeuvent différer (runtime feature-flag/tenant gate).- Politique claire : si une tâche critique échoue, la finalisation ne démarre pas.
traceethistorydonnent un audit transparent : qui a été lancé, combien de tentatives, pourquoi échec.
requirements.txt
openai==2.21.0
Exemple de sortie
À cause de la simulation de timeout dans payments_worker à la première tentative, un trace typique contient retried=true.
{
"status": "ok",
"stop_reason": "success",
"answer": "Morning Operations Report - US Region (2026-02-26): Health=yellow. Gross sales=$182,450, orders=4,820, AOV=$37.85. Payment failed rate=2.3% with 3 chargeback alerts. Inventory has one out-of-stock SKU (SKU-9033). Next action: prioritize immediate restock for SKU-9033.",
"plan": [
{
"id": "t1",
"worker": "sales_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t2",
"worker": "payments_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
},
{
"id": "t3",
"worker": "inventory_worker",
"args": {"report_date": "2026-02-26", "region": "US"},
"critical": true
}
],
"aggregate": {
"report_date": "2026-02-26",
"region": "US",
"health": "yellow",
"sales": {"gross_sales_usd": 182450.0, "orders": 4820, "aov_usd": 37.85},
"payments": {"failed_payment_rate": 0.023, "chargeback_alerts": 3, "gateway_incident": "none"},
"inventory": {"low_stock_skus": ["SKU-4411", "SKU-8820"], "out_of_stock_skus": ["SKU-9033"], "restock_eta_days": 2}
},
"trace": [
{
"task_id": "t1",
"worker": "sales_worker",
"status": "done",
"attempts_used": 1,
"retried": false
},
{
"task_id": "t2",
"worker": "payments_worker",
"status": "done",
"attempts_used": 2,
"retried": true
},
{
"task_id": "t3",
"worker": "inventory_worker",
"status": "done",
"attempts_used": 1,
"retried": false
}
],
"history": [{...}]
}
Ceci est un exemple abrégé : history est affiché de façon concise, et le format de answer peut légèrement varier entre les runs.
Si vous définissez INVENTORY_RUNTIME_ENABLED=False, worker_denied:inventory_worker apparaît dans trace - exemple clair de la différence entre allowlist policy et execution.
stop_reason courants
success- plan exécuté, réponse finale généréeinvalid_plan:*- le plan LLM n’a pas passé la policy validationinvalid_plan:missing_keys- il manque des champs obligatoires du contrat dans une tâchellm_timeout- le LLM n’a pas répondu dansOPENAI_TIMEOUT_SECONDSllm_empty- le LLM a renvoyé une réponse finale videmax_seconds- le budget temps global du run est dépassémax_dispatches- limite de lancements de sous-tâches dépassée (retries inclus)worker_denied:<name>- worker absent de l’execution allowlistworker_missing:<name>- worker absent du registryworker_bad_args:<name>- la tâche contient des arguments invalidesworker_bad_result:<name>- le worker a renvoyé des données hors contrattask_timeout- sous-tâche hors délaitask_timeout_secondscritical_task_failed- au moins une sous-tâche critique a échoué
Ce qui N’est PAS montré ici
- Pas d’auth/PII ni d’isolation tenant.
- Pas de file/job-runner persistant (RabbitMQ/SQS/Kafka).
- Pas de circuit breaker ni de politiques backoff complexes.
- Pas de budgets token/coût (cost guardrails).
À essayer ensuite
- Rendez
payments_workernon critique (critical=false) et voyez comment le run renvoie un résultat partiel. - Définissez
INVENTORY_RUNTIME_ENABLED=Falseet vérifiezworker_denied:inventory_worker. - Réduisez
task_timeout_secondsà0.2et observez l’augmentation des timeout/retry. - Ajoutez un quatrième worker (
fraud_worker) et comparez le temps de run avecmax_parallel=2etmax_parallel=4.