Essence du pattern (bref)
Task Decomposition Agent est un pattern dans lequel l'agent commence par décomposer une tâche complexe en étapes séquentielles, puis les exécute ensuite une par une.
Le modèle est responsable du plan et de l'ordre des actions, tandis que l'exécution de chaque étape passe par un gateway contrôlé avec validation du plan, allowlist et budgets de run.
Ce que cet exemple démontre
- étape
Planséparée avantExecute - policy boundary entre planning (LLM) et tools (execution layer)
- validation stricte du plan (
kind, structure des étapes, allowed keys) - allowlist d’outils (deny by default)
- budgets de run séparés :
max_plan_steps(plan) etmax_execute_steps(exécution), plusmax_tool_calls,max_seconds stop_reasonexplicites pour le débogage et le monitoringraw_plandans la réponse si le plan est invalide
Architecture
- Le LLM reçoit le goal et renvoie un plan JSON (
kind="plan",steps). - La policy boundary valide le plan et bloque les formes invalides/dangereuses.
- Chaque étape est exécutée séquentiellement via
ToolGateway(allowlist, budgets, loop detection). - L’observation de chaque étape est ajoutée à
historycomme checkpoint pour une exécution transparente. - Après l’exécution de toutes les étapes, le LLM fait une synthèse finale à partir de
historyvia un appelCombineséparé sans tools.
Le LLM renvoie un intent (plan), traité comme un input non fiable : la policy boundary le valide d’abord puis (si autorisé) appelle les tools.
L’allowlist est appliquée deux fois : en plan validation (invalid_plan:tool_not_allowed:*) et pendant l’exécution des tools (tool_denied:*).
Ainsi Task Decomposition reste contrôlable : l’agent planifie et l’exécution passe par une couche contrôlée.
Structure du projet
examples/
└── agent-patterns/
└── task-decomposition-agent/
└── python/
├── main.py # Plan -> Execute -> Combine
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: plan validation + tool execution control
├── tools.py # deterministic tools (Anna/Max, US, USD)
└── requirements.txt
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
C’est la variante shell (macOS/Linux). Sous Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.
Tâche
Imagine qu'un manager demande :
"Prépare un court rapport pour avril 2026 : ventes, remboursements, revenu net et risques."
L'agent ne doit pas inventer une réponse "de tête". Il doit :
- d'abord créer un plan
- exécuter les étapes dans l'ordre
- utiliser uniquement des données provenant de tools autorisés
- donner la réponse finale seulement après toutes les étapes
Solution
Ici, l'agent fonctionne selon un flux simple :
- le LLM crée d'abord un plan avec plusieurs étapes
- le système vérifie que le plan est valide et autorisé
- les tools exécutent les étapes et renvoient des faits
- ensuite, le LLM compose le résumé final court
- si un plan ou une étape est invalide, le run s'arrête avec une raison
Code
tools.py — tools (source de faits)
from __future__ import annotations
from typing import Any
MANAGERS = {
42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}
SALES_DATA = {
"2026-04": [
{"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
{"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
{"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
{"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
{"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
]
}
REFUND_DATA = {
"2026-04": [
{"day": "2026-04-01", "refunds_usd": 140.0},
{"day": "2026-04-02", "refunds_usd": 260.0},
{"day": "2026-04-03", "refunds_usd": 210.0},
{"day": "2026-04-04", "refunds_usd": 590.0},
{"day": "2026-04-05", "refunds_usd": 170.0},
]
}
def get_manager_profile(manager_id: int) -> dict[str, Any]:
manager = MANAGERS.get(manager_id)
if not manager:
return {"error": f"manager {manager_id} not found"}
return {"manager": manager}
def fetch_sales_data(month: str) -> dict[str, Any]:
rows = SALES_DATA.get(month)
if not rows:
return {"error": f"sales data for {month} not found"}
return {"month": month, "currency": "USD", "daily_sales": rows}
def fetch_refund_data(month: str) -> dict[str, Any]:
rows = REFUND_DATA.get(month)
if not rows:
return {"error": f"refund data for {month} not found"}
return {"month": month, "currency": "USD", "daily_refunds": rows}
def calculate_monthly_kpis(month: str) -> dict[str, Any]:
sales_rows = SALES_DATA.get(month)
refund_rows = REFUND_DATA.get(month)
if not sales_rows or not refund_rows:
return {"error": f"kpi inputs for {month} not found"}
gross_sales = sum(row["gross_usd"] for row in sales_rows)
total_refunds = sum(row["refunds_usd"] for row in refund_rows)
total_orders = sum(row["orders"] for row in sales_rows)
net_sales = gross_sales - total_refunds
refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0
top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]
return {
"month": month,
"currency": "USD",
"gross_sales_usd": round(gross_sales, 2),
"refunds_usd": round(total_refunds, 2),
"net_sales_usd": round(net_sales, 2),
"orders": total_orders,
"refund_rate": round(refund_rate, 4),
"top_sales_day": top_day,
}
def detect_risk_signals(month: str) -> dict[str, Any]:
refund_rows = REFUND_DATA.get(month)
if not refund_rows:
return {"error": f"refund data for {month} not found"}
high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
warnings: list[str] = []
if high_refund_day["refunds_usd"] >= 500:
warnings.append(
f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
)
if not warnings:
warnings.append("No critical risk signals detected for this month.")
return {
"month": month,
"currency": "USD",
"risk_warnings": warnings,
"peak_refund_day": high_refund_day,
}
Ce qui est le plus important ici (en mots simples)
- Les tools sont déterministes et ne contiennent pas de logique LLM.
- L’agent décide seulement quelles étapes exécuter.
- La logique métier est exécutée par l’execution layer (tools), pas par le LLM.
gateway.py — policy boundary (la couche la plus importante)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_plan_steps: int = 6
max_execute_steps: int = 8
max_tool_calls: int = 8
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def args_hash(args: dict[str, Any]) -> str:
raw = _stable_json(args or {})
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_plan_action(
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
if not isinstance(action, dict):
raise StopRun("invalid_plan:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_plan:non_json")
if kind != "plan":
raise StopRun("invalid_plan:bad_kind")
allowed_top_keys = {"kind", "steps"}
if set(action.keys()) - allowed_top_keys:
raise StopRun("invalid_plan:extra_keys")
steps = action.get("steps")
if not isinstance(steps, list) or not steps:
raise StopRun("invalid_plan:missing_steps")
if len(steps) < 3:
raise StopRun("invalid_plan:min_steps")
if len(steps) > max_plan_steps:
raise StopRun("invalid_plan:max_steps")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for index, step in enumerate(steps, start=1):
if not isinstance(step, dict):
raise StopRun(f"invalid_plan:step_{index}_not_object")
allowed_step_keys = {"id", "title", "tool", "args"}
if set(step.keys()) - allowed_step_keys:
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
step_id = step.get("id")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_id")
if step_id in seen_ids:
raise StopRun("invalid_plan:duplicate_step_id")
seen_ids.add(step_id)
title = step.get("title")
if not isinstance(title, str) or not title.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_title")
tool = step.get("tool")
if not isinstance(tool, str) or not tool.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
tool = tool.strip()
if tool not in allowed_tools:
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
args = step.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun(f"invalid_plan:step_{index}_bad_args")
normalized.append(
{
"id": step_id.strip(),
"title": title.strip(),
"tool": tool,
"args": args,
}
)
return normalized
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_calls: set[str] = set()
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
tool = self.registry.get(name)
if tool is None:
raise StopRun(f"tool_missing:{name}")
signature = f"{name}:{args_hash(args)}"
if signature in self.seen_calls:
raise StopRun("loop_detected")
self.seen_calls.add(signature)
try:
return tool(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
Ce qui est le plus important ici (en mots simples)
validate_plan_action(...)est la governance/control layer pour le plan du LLM.- Le plan est traité comme un input non fiable et passe une validation stricte.
ToolGateway.call(...)est la frontièreagent ≠ executor: l’agent planifie, le gateway exécute en sécurité.loop_detecteddétecte les exact-repeat (tool + args_hash).
llm.py — planning + final synthesis
Le LLM ne voit que le catalogue des tools disponibles ; si un tool n’est pas dans l’allowlist, le gateway arrête le run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"steps": [
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
]
}
Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()
TOOL_CATALOG = [
{
"name": "get_manager_profile",
"description": "Get manager profile by manager_id",
"args": {"manager_id": "integer"},
},
{
"name": "fetch_sales_data",
"description": "Get daily gross sales for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "fetch_refund_data",
"description": "Get daily refund values for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "calculate_monthly_kpis",
"description": "Calculate gross/refunds/net/order KPIs for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "detect_risk_signals",
"description": "Detect risk warnings for a month",
"args": {"month": "string in YYYY-MM"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
payload = {
"goal": goal,
"max_plan_steps": max_plan_steps,
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Ce qui est le plus important ici (en mots simples)
create_plan(...)est la decision-stage pour la decomposition.timeout=LLM_TIMEOUT_SECONDS+LLMTimeoutfournissent un arrêt contrôlé en cas de problèmes réseau/modèle.- Une réponse finale vide n’est pas masquée par un texte fallback :
llm_emptyexplicite est renvoyé. - Si le JSON est cassé,
{"kind":"invalid"...}est renvoyé, et la policy layer donne unstop_reasonlisible.
main.py — Plan -> Execute -> Combine
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
calculate_monthly_kpis,
detect_risk_signals,
fetch_refund_data,
fetch_sales_data,
get_manager_profile,
)
GOAL = (
"Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
"Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)
# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)
TOOL_REGISTRY = {
"get_manager_profile": get_manager_profile,
"fetch_sales_data": fetch_sales_data,
"fetch_refund_data": fetch_refund_data,
"calculate_monthly_kpis": calculate_monthly_kpis,
"detect_risk_signals": detect_risk_signals,
}
ALLOWED_TOOLS = {
"get_manager_profile",
"fetch_sales_data",
"fetch_refund_data",
"calculate_monthly_kpis",
"detect_risk_signals",
}
def run_task_decomposition(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
try:
raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "plan",
"trace": trace,
"history": history,
}
try:
steps = validate_plan_action(
raw_plan,
max_plan_steps=BUDGET.max_plan_steps,
allowed_tools=ALLOWED_TOOLS,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if len(steps) > BUDGET.max_execute_steps:
return {
"status": "stopped",
"stop_reason": "max_execute_steps",
"plan": steps,
"trace": trace,
"history": history,
}
for step_no, step in enumerate(steps, start=1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"plan": steps,
"trace": trace,
"history": history,
}
tool_name = step["tool"]
tool_args = step["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"plan": steps,
"trace": trace,
"history": history,
}
history.append(
{
"step_no": step_no,
"plan_step": step,
"observation": observation,
}
)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": steps,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_task_decomposition(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui est le plus important ici (en mots simples)
run_task_decomposition(...)pilotePlan -> Execute -> Combine; les actions métier passent uniquement parToolGateway.- En cas de plan invalide,
raw_planest renvoyé pour le débogage. - Dans cette version,
max_execute_stepsvérifie la longueur du plan avant l’execution ; ensuite, les limites runtime sontmax_tool_callsetmax_seconds. historyest un log transparent des étapes : ce qui était dans le plan et quelle observation chaque tool a renvoyée.
requirements.txt
openai==2.21.0
Exemple de sortie
L’ordre des étapes du plan peut varier légèrement entre les runs, mais les policy-gates et stop reasons restent stables. Le planner peut changer l’ordre des étapes ; l’important est que policy + allowlist fonctionnent de la même manière quel que soit l’ordre.
{
"status": "ok",
"stop_reason": "success",
"answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
"plan": [
{"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
{"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
{"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
{"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
{"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
],
"trace": [
{"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
{"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
{"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
{"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
{"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
],
"history": [{...}]
}
Ceci est un exemple abrégé : dans un run réel, plan et trace peuvent contenir plus d’étapes.
history est le journal d’exécution : pour chaque step_no, il y a plan_step et observation.
args_hash hash uniquement les arguments ; il peut donc coïncider entre différents tools si les args sont identiques ; la loop detection tient aussi compte du nom du tool.
stop_reason typiques
success— le plan est exécuté et la réponse finale est généréeinvalid_plan:*— le plan du LLM n’a pas passé la policy validationinvalid_plan:non_json— le LLM n’a pas renvoyé de plan JSON valideinvalid_plan:min_steps— le plan contient moins de 3 étapes de decompositioninvalid_plan:tool_not_allowed:<name>— le plan contient un tool hors allowlistmax_execute_steps— le plan est plus long que l’execution budget autorisémax_tool_calls— limite d’appels de tools atteintemax_seconds— time budget du run dépasséllm_timeout— le LLM n’a pas répondu dansOPENAI_TIMEOUT_SECONDSllm_empty— le LLM a renvoyé une réponse finale vide à l’étapefinalizetool_denied:<name>— le tool n’est pas dans l’allowlisttool_missing:<name>— le tool est absent du registrytool_bad_args:<name>— l’étape contient des arguments invalidesloop_detected— exact repeat (tool + args_hash)
Ce qui N’est PAS montré ici
- Pas d’auth/PII ni de contrôles d’accès production aux données personnelles.
- Pas de politiques retry/backoff pour le LLM et la couche tools.
- Pas de budgets token/coût (cost guardrails).
- Les tools ici sont des mocks déterministes d’apprentissage, pas de vraies API externes.
Que tester ensuite
- Retire
detect_risk_signalsdeALLOWED_TOOLSet vérifietool_denied:*. - Ajoute au plan un tool inexistant et vérifie
tool_missing:*. - Réduis
max_plan_stepsà3et observe à quelle fréquence tu obtiensinvalid_plan:max_steps. - Change
GOALenmanager_id=7 (Max)et compare la synthèse finale. - Ajoute des guardrails cost/token dans
Budgetet dans le résultat JSON final.
Code complet sur GitHub
Le dépôt contient la version runnable complète de cet exemple : planning, policy boundary, exécution séquentielle des étapes et stop reasons.
Voir le code complet sur GitHub ↗