Task Decomposition Agent — Python (implémentation complète avec LLM)

Exemple exécutable d’agent Task Decomposition en Python, style production, avec planning, policy boundary, exécution séquentielle des étapes, budgets et stop reasons.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tâche
  7. Solution
  8. Code
  9. tools.py — tools (source de faits)
  10. gateway.py — policy boundary (la couche la plus importante)
  11. llm.py — planning + final synthesis
  12. main.py — Plan -> Execute -> Combine
  13. requirements.txt
  14. Exemple de sortie
  15. stop_reason typiques
  16. Ce qui N’est PAS montré ici
  17. Que tester ensuite
  18. Code complet sur GitHub

Essence du pattern (bref)

Task Decomposition Agent est un pattern dans lequel l'agent commence par décomposer une tâche complexe en étapes séquentielles, puis les exécute ensuite une par une.

Le modèle est responsable du plan et de l'ordre des actions, tandis que l'exécution de chaque étape passe par un gateway contrôlé avec validation du plan, allowlist et budgets de run.


Ce que cet exemple démontre

  • étape Plan séparée avant Execute
  • policy boundary entre planning (LLM) et tools (execution layer)
  • validation stricte du plan (kind, structure des étapes, allowed keys)
  • allowlist d’outils (deny by default)
  • budgets de run séparés : max_plan_steps (plan) et max_execute_steps (exécution), plus max_tool_calls, max_seconds
  • stop_reason explicites pour le débogage et le monitoring
  • raw_plan dans la réponse si le plan est invalide

Architecture

  1. Le LLM reçoit le goal et renvoie un plan JSON (kind="plan", steps).
  2. La policy boundary valide le plan et bloque les formes invalides/dangereuses.
  3. Chaque étape est exécutée séquentiellement via ToolGateway (allowlist, budgets, loop detection).
  4. L’observation de chaque étape est ajoutée à history comme checkpoint pour une exécution transparente.
  5. Après l’exécution de toutes les étapes, le LLM fait une synthèse finale à partir de history via un appel Combine séparé sans tools.

Le LLM renvoie un intent (plan), traité comme un input non fiable : la policy boundary le valide d’abord puis (si autorisé) appelle les tools. L’allowlist est appliquée deux fois : en plan validation (invalid_plan:tool_not_allowed:*) et pendant l’exécution des tools (tool_denied:*).

Ainsi Task Decomposition reste contrôlable : l’agent planifie et l’exécution passe par une couche contrôlée.


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── task-decomposition-agent/
        └── python/
            ├── main.py           # Plan -> Execute -> Combine
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: plan validation + tool execution control
            ├── tools.py          # deterministic tools (Anna/Max, US, USD)
            └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sous Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.


Tâche

Imagine qu'un manager demande :

"Prépare un court rapport pour avril 2026 : ventes, remboursements, revenu net et risques."

L'agent ne doit pas inventer une réponse "de tête". Il doit :

  • d'abord créer un plan
  • exécuter les étapes dans l'ordre
  • utiliser uniquement des données provenant de tools autorisés
  • donner la réponse finale seulement après toutes les étapes

Solution

Ici, l'agent fonctionne selon un flux simple :

  • le LLM crée d'abord un plan avec plusieurs étapes
  • le système vérifie que le plan est valide et autorisé
  • les tools exécutent les étapes et renvoient des faits
  • ensuite, le LLM compose le résumé final court
  • si un plan ou une étape est invalide, le run s'arrête avec une raison

Code

tools.py — tools (source de faits)

PYTHON
from __future__ import annotations

from typing import Any

MANAGERS = {
    42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
    7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}

SALES_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
        {"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
        {"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
        {"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
        {"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
    ]
}

REFUND_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "refunds_usd": 140.0},
        {"day": "2026-04-02", "refunds_usd": 260.0},
        {"day": "2026-04-03", "refunds_usd": 210.0},
        {"day": "2026-04-04", "refunds_usd": 590.0},
        {"day": "2026-04-05", "refunds_usd": 170.0},
    ]
}


def get_manager_profile(manager_id: int) -> dict[str, Any]:
    manager = MANAGERS.get(manager_id)
    if not manager:
        return {"error": f"manager {manager_id} not found"}
    return {"manager": manager}


def fetch_sales_data(month: str) -> dict[str, Any]:
    rows = SALES_DATA.get(month)
    if not rows:
        return {"error": f"sales data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_sales": rows}


def fetch_refund_data(month: str) -> dict[str, Any]:
    rows = REFUND_DATA.get(month)
    if not rows:
        return {"error": f"refund data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_refunds": rows}


def calculate_monthly_kpis(month: str) -> dict[str, Any]:
    sales_rows = SALES_DATA.get(month)
    refund_rows = REFUND_DATA.get(month)
    if not sales_rows or not refund_rows:
        return {"error": f"kpi inputs for {month} not found"}

    gross_sales = sum(row["gross_usd"] for row in sales_rows)
    total_refunds = sum(row["refunds_usd"] for row in refund_rows)
    total_orders = sum(row["orders"] for row in sales_rows)
    net_sales = gross_sales - total_refunds
    refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0

    top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]

    return {
        "month": month,
        "currency": "USD",
        "gross_sales_usd": round(gross_sales, 2),
        "refunds_usd": round(total_refunds, 2),
        "net_sales_usd": round(net_sales, 2),
        "orders": total_orders,
        "refund_rate": round(refund_rate, 4),
        "top_sales_day": top_day,
    }


def detect_risk_signals(month: str) -> dict[str, Any]:
    refund_rows = REFUND_DATA.get(month)
    if not refund_rows:
        return {"error": f"refund data for {month} not found"}

    high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
    warnings: list[str] = []

    if high_refund_day["refunds_usd"] >= 500:
        warnings.append(
            f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
        )

    if not warnings:
        warnings.append("No critical risk signals detected for this month.")

    return {
        "month": month,
        "currency": "USD",
        "risk_warnings": warnings,
        "peak_refund_day": high_refund_day,
    }

Ce qui est le plus important ici (en mots simples)

  • Les tools sont déterministes et ne contiennent pas de logique LLM.
  • L’agent décide seulement quelles étapes exécuter.
  • La logique métier est exécutée par l’execution layer (tools), pas par le LLM.

gateway.py — policy boundary (la couche la plus importante)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_plan_steps: int = 6
    max_execute_steps: int = 8
    max_tool_calls: int = 8
    max_seconds: int = 60


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(item) for item in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(
                json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
            )
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def args_hash(args: dict[str, Any]) -> str:
    raw = _stable_json(args or {})
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def validate_plan_action(
    action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
    if not isinstance(action, dict):
        raise StopRun("invalid_plan:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_plan:non_json")
    if kind != "plan":
        raise StopRun("invalid_plan:bad_kind")

    allowed_top_keys = {"kind", "steps"}
    if set(action.keys()) - allowed_top_keys:
        raise StopRun("invalid_plan:extra_keys")

    steps = action.get("steps")
    if not isinstance(steps, list) or not steps:
        raise StopRun("invalid_plan:missing_steps")
    if len(steps) < 3:
        raise StopRun("invalid_plan:min_steps")
    if len(steps) > max_plan_steps:
        raise StopRun("invalid_plan:max_steps")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()

    for index, step in enumerate(steps, start=1):
        if not isinstance(step, dict):
            raise StopRun(f"invalid_plan:step_{index}_not_object")

        allowed_step_keys = {"id", "title", "tool", "args"}
        if set(step.keys()) - allowed_step_keys:
            raise StopRun(f"invalid_plan:step_{index}_extra_keys")

        step_id = step.get("id")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_id")
        if step_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_step_id")
        seen_ids.add(step_id)

        title = step.get("title")
        if not isinstance(title, str) or not title.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_title")

        tool = step.get("tool")
        if not isinstance(tool, str) or not tool.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_tool")
        tool = tool.strip()
        if tool not in allowed_tools:
            raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")

        args = step.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun(f"invalid_plan:step_{index}_bad_args")

        normalized.append(
            {
                "id": step_id.strip(),
                "title": title.strip(),
                "tool": tool,
                "args": args,
            }
        )

    return normalized


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_calls: set[str] = set()

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        tool = self.registry.get(name)
        if tool is None:
            raise StopRun(f"tool_missing:{name}")

        signature = f"{name}:{args_hash(args)}"
        if signature in self.seen_calls:
            raise StopRun("loop_detected")
        self.seen_calls.add(signature)

        try:
            return tool(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

Ce qui est le plus important ici (en mots simples)

  • validate_plan_action(...) est la governance/control layer pour le plan du LLM.
  • Le plan est traité comme un input non fiable et passe une validation stricte.
  • ToolGateway.call(...) est la frontière agent ≠ executor : l’agent planifie, le gateway exécute en sécurité.
  • loop_detected détecte les exact-repeat (tool + args_hash).

llm.py — planning + final synthesis

Le LLM ne voit que le catalogue des tools disponibles ; si un tool n’est pas dans l’allowlist, le gateway arrête le run.

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "steps": [
    {"id": "step_1", "title": "...", "tool": "...", "args": {...}}
  ]
}

Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_manager_profile",
        "description": "Get manager profile by manager_id",
        "args": {"manager_id": "integer"},
    },
    {
        "name": "fetch_sales_data",
        "description": "Get daily gross sales for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "fetch_refund_data",
        "description": "Get daily refund values for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "calculate_monthly_kpis",
        "description": "Calculate gross/refunds/net/order KPIs for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "detect_risk_signals",
        "description": "Detect risk warnings for a month",
        "args": {"month": "string in YYYY-MM"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "max_plan_steps": max_plan_steps,
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Ce qui est le plus important ici (en mots simples)

  • create_plan(...) est la decision-stage pour la decomposition.
  • timeout=LLM_TIMEOUT_SECONDS + LLMTimeout fournissent un arrêt contrôlé en cas de problèmes réseau/modèle.
  • Une réponse finale vide n’est pas masquée par un texte fallback : llm_empty explicite est renvoyé.
  • Si le JSON est cassé, {"kind":"invalid"...} est renvoyé, et la policy layer donne un stop_reason lisible.

main.py — Plan -> Execute -> Combine

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
    calculate_monthly_kpis,
    detect_risk_signals,
    fetch_refund_data,
    fetch_sales_data,
    get_manager_profile,
)

GOAL = (
    "Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
    "Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)

# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)

TOOL_REGISTRY = {
    "get_manager_profile": get_manager_profile,
    "fetch_sales_data": fetch_sales_data,
    "fetch_refund_data": fetch_refund_data,
    "calculate_monthly_kpis": calculate_monthly_kpis,
    "detect_risk_signals": detect_risk_signals,
}

ALLOWED_TOOLS = {
    "get_manager_profile",
    "fetch_sales_data",
    "fetch_refund_data",
    "calculate_monthly_kpis",
    "detect_risk_signals",
}


def run_task_decomposition(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    try:
        raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        steps = validate_plan_action(
            raw_plan,
            max_plan_steps=BUDGET.max_plan_steps,
            allowed_tools=ALLOWED_TOOLS,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if len(steps) > BUDGET.max_execute_steps:
        return {
            "status": "stopped",
            "stop_reason": "max_execute_steps",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    for step_no, step in enumerate(steps, start=1):
        elapsed = time.monotonic() - started
        if elapsed > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        tool_name = step["tool"]
        tool_args = step["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": True,
                }
            )
        except StopRun as exc:
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": False,
                    "stop_reason": exc.reason,
                }
            )
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        history.append(
            {
                "step_no": step_no,
                "plan_step": step,
                "observation": observation,
            }
        )

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": steps,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_task_decomposition(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui est le plus important ici (en mots simples)

  • run_task_decomposition(...) pilote Plan -> Execute -> Combine ; les actions métier passent uniquement par ToolGateway.
  • En cas de plan invalide, raw_plan est renvoyé pour le débogage.
  • Dans cette version, max_execute_steps vérifie la longueur du plan avant l’execution ; ensuite, les limites runtime sont max_tool_calls et max_seconds.
  • history est un log transparent des étapes : ce qui était dans le plan et quelle observation chaque tool a renvoyée.

requirements.txt

TEXT
openai==2.21.0

Exemple de sortie

L’ordre des étapes du plan peut varier légèrement entre les runs, mais les policy-gates et stop reasons restent stables. Le planner peut changer l’ordre des étapes ; l’important est que policy + allowlist fonctionnent de la même manière quel que soit l’ordre.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
  "plan": [
    {"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
    {"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
    {"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
    {"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
    {"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
  ],
  "trace": [
    {"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
    {"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
    {"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
    {"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
    {"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
  ],
  "history": [{...}]
}

Ceci est un exemple abrégé : dans un run réel, plan et trace peuvent contenir plus d’étapes.

history est le journal d’exécution : pour chaque step_no, il y a plan_step et observation.

args_hash hash uniquement les arguments ; il peut donc coïncider entre différents tools si les args sont identiques ; la loop detection tient aussi compte du nom du tool.


stop_reason typiques

  • success — le plan est exécuté et la réponse finale est générée
  • invalid_plan:* — le plan du LLM n’a pas passé la policy validation
  • invalid_plan:non_json — le LLM n’a pas renvoyé de plan JSON valide
  • invalid_plan:min_steps — le plan contient moins de 3 étapes de decomposition
  • invalid_plan:tool_not_allowed:<name> — le plan contient un tool hors allowlist
  • max_execute_steps — le plan est plus long que l’execution budget autorisé
  • max_tool_calls — limite d’appels de tools atteinte
  • max_seconds — time budget du run dépassé
  • llm_timeout — le LLM n’a pas répondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty — le LLM a renvoyé une réponse finale vide à l’étape finalize
  • tool_denied:<name> — le tool n’est pas dans l’allowlist
  • tool_missing:<name> — le tool est absent du registry
  • tool_bad_args:<name> — l’étape contient des arguments invalides
  • loop_detected — exact repeat (tool + args_hash)

Ce qui N’est PAS montré ici

  • Pas d’auth/PII ni de contrôles d’accès production aux données personnelles.
  • Pas de politiques retry/backoff pour le LLM et la couche tools.
  • Pas de budgets token/coût (cost guardrails).
  • Les tools ici sont des mocks déterministes d’apprentissage, pas de vraies API externes.

Que tester ensuite

  • Retire detect_risk_signals de ALLOWED_TOOLS et vérifie tool_denied:*.
  • Ajoute au plan un tool inexistant et vérifie tool_missing:*.
  • Réduis max_plan_steps à 3 et observe à quelle fréquence tu obtiens invalid_plan:max_steps.
  • Change GOAL en manager_id=7 (Max) et compare la synthèse finale.
  • Ajoute des guardrails cost/token dans Budget et dans le résultat JSON final.

Code complet sur GitHub

Le dépôt contient la version runnable complète de cet exemple : planning, policy boundary, exécution séquentielle des étapes et stop reasons.

Voir le code complet sur GitHub ↗
⏱️ 15 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.