Esencia del patrón (breve)
Task Decomposition Agent es un patrón en el que el agente primero divide una tarea compleja en pasos secuenciales, y solo después los ejecuta uno por uno.
El modelo se encarga del plan y del orden de acciones, mientras que la ejecución de cada paso pasa por un gateway controlado con validación del plan, allowlist y presupuestos de run.
Qué demuestra este ejemplo
- etapa
Planseparada antes deExecute - policy boundary entre planning (LLM) y tools (execution layer)
- validación estricta del plan (
kind, estructura de pasos, allowed keys) - allowlist de herramientas (deny by default)
- presupuestos de run separados:
max_plan_steps(plan) ymax_execute_steps(ejecución), másmax_tool_calls,max_seconds stop_reasonexplícitos para depuración y monitoreoraw_planen la respuesta si el plan es inválido
Arquitectura
- El LLM recibe el goal y devuelve un plan en JSON (
kind="plan",steps). - El policy boundary valida el plan y bloquea formas inválidas/no seguras.
- Cada paso se ejecuta de forma secuencial mediante
ToolGateway(allowlist, budgets, loop detection). - La observation de cada paso se agrega a
historycomo checkpoint para ejecución transparente. - Después de ejecutar todos los pasos, el LLM hace la síntesis final basada en
historycon una llamadaCombineseparada, sin tools.
El LLM devuelve intent (plan), tratado como entrada no confiable: el policy boundary primero lo valida y luego (si está permitido) llama tools.
La allowlist se aplica dos veces: en plan validation (invalid_plan:tool_not_allowed:*) y en ejecución de tools (tool_denied:*).
Así Task Decomposition se mantiene controlable: el agente planifica y la ejecución pasa por una capa controlada.
Estructura del proyecto
examples/
└── agent-patterns/
└── task-decomposition-agent/
└── python/
├── main.py # Plan -> Execute -> Combine
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: plan validation + tool execution control
├── tools.py # deterministic tools (Anna/Max, US, USD)
└── requirements.txt
Cómo ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opción con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opción con .env (opcional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si lo prefieres, python-dotenv para cargar .env automáticamente.
Tarea
Imagina que un manager pide:
"Prepara un informe corto de abril de 2026: ventas, devoluciones, ingresos netos y riesgos."
El agente no debe inventar la respuesta "de memoria". Debe:
- primero crear un plan
- ejecutar los pasos en orden
- usar datos solo de tools permitidas
- dar la respuesta final solo después de todos los pasos
Solución
Aquí el agente trabaja con un flujo simple:
- el LLM primero crea un plan con varios pasos
- el sistema verifica que el plan sea válido y permitido
- las tools ejecutan pasos y devuelven hechos
- después de eso, el LLM compone el resumen final corto
- si un plan o paso es inválido, el run se detiene con una razón
Código
tools.py — herramientas (fuente de hechos)
from __future__ import annotations
from typing import Any
MANAGERS = {
42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}
SALES_DATA = {
"2026-04": [
{"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
{"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
{"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
{"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
{"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
]
}
REFUND_DATA = {
"2026-04": [
{"day": "2026-04-01", "refunds_usd": 140.0},
{"day": "2026-04-02", "refunds_usd": 260.0},
{"day": "2026-04-03", "refunds_usd": 210.0},
{"day": "2026-04-04", "refunds_usd": 590.0},
{"day": "2026-04-05", "refunds_usd": 170.0},
]
}
def get_manager_profile(manager_id: int) -> dict[str, Any]:
manager = MANAGERS.get(manager_id)
if not manager:
return {"error": f"manager {manager_id} not found"}
return {"manager": manager}
def fetch_sales_data(month: str) -> dict[str, Any]:
rows = SALES_DATA.get(month)
if not rows:
return {"error": f"sales data for {month} not found"}
return {"month": month, "currency": "USD", "daily_sales": rows}
def fetch_refund_data(month: str) -> dict[str, Any]:
rows = REFUND_DATA.get(month)
if not rows:
return {"error": f"refund data for {month} not found"}
return {"month": month, "currency": "USD", "daily_refunds": rows}
def calculate_monthly_kpis(month: str) -> dict[str, Any]:
sales_rows = SALES_DATA.get(month)
refund_rows = REFUND_DATA.get(month)
if not sales_rows or not refund_rows:
return {"error": f"kpi inputs for {month} not found"}
gross_sales = sum(row["gross_usd"] for row in sales_rows)
total_refunds = sum(row["refunds_usd"] for row in refund_rows)
total_orders = sum(row["orders"] for row in sales_rows)
net_sales = gross_sales - total_refunds
refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0
top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]
return {
"month": month,
"currency": "USD",
"gross_sales_usd": round(gross_sales, 2),
"refunds_usd": round(total_refunds, 2),
"net_sales_usd": round(net_sales, 2),
"orders": total_orders,
"refund_rate": round(refund_rate, 4),
"top_sales_day": top_day,
}
def detect_risk_signals(month: str) -> dict[str, Any]:
refund_rows = REFUND_DATA.get(month)
if not refund_rows:
return {"error": f"refund data for {month} not found"}
high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
warnings: list[str] = []
if high_refund_day["refunds_usd"] >= 500:
warnings.append(
f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
)
if not warnings:
warnings.append("No critical risk signals detected for this month.")
return {
"month": month,
"currency": "USD",
"risk_warnings": warnings,
"peak_refund_day": high_refund_day,
}
Qué es lo más importante aquí (en palabras simples)
- Las tools son determinísticas y no contienen lógica de LLM.
- El agente solo decide qué pasos ejecutar.
- La lógica de negocio la ejecuta la execution layer (tools), no el LLM.
gateway.py — policy boundary (la capa más importante)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_plan_steps: int = 6
max_execute_steps: int = 8
max_tool_calls: int = 8
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def args_hash(args: dict[str, Any]) -> str:
raw = _stable_json(args or {})
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_plan_action(
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
if not isinstance(action, dict):
raise StopRun("invalid_plan:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_plan:non_json")
if kind != "plan":
raise StopRun("invalid_plan:bad_kind")
allowed_top_keys = {"kind", "steps"}
if set(action.keys()) - allowed_top_keys:
raise StopRun("invalid_plan:extra_keys")
steps = action.get("steps")
if not isinstance(steps, list) or not steps:
raise StopRun("invalid_plan:missing_steps")
if len(steps) < 3:
raise StopRun("invalid_plan:min_steps")
if len(steps) > max_plan_steps:
raise StopRun("invalid_plan:max_steps")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for index, step in enumerate(steps, start=1):
if not isinstance(step, dict):
raise StopRun(f"invalid_plan:step_{index}_not_object")
allowed_step_keys = {"id", "title", "tool", "args"}
if set(step.keys()) - allowed_step_keys:
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
step_id = step.get("id")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_id")
if step_id in seen_ids:
raise StopRun("invalid_plan:duplicate_step_id")
seen_ids.add(step_id)
title = step.get("title")
if not isinstance(title, str) or not title.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_title")
tool = step.get("tool")
if not isinstance(tool, str) or not tool.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
tool = tool.strip()
if tool not in allowed_tools:
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
args = step.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun(f"invalid_plan:step_{index}_bad_args")
normalized.append(
{
"id": step_id.strip(),
"title": title.strip(),
"tool": tool,
"args": args,
}
)
return normalized
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_calls: set[str] = set()
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
tool = self.registry.get(name)
if tool is None:
raise StopRun(f"tool_missing:{name}")
signature = f"{name}:{args_hash(args)}"
if signature in self.seen_calls:
raise StopRun("loop_detected")
self.seen_calls.add(signature)
try:
return tool(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
Qué es lo más importante aquí (en palabras simples)
validate_plan_action(...)es la governance/control layer para el plan del LLM.- El plan se trata como input no confiable y pasa por validación estricta.
ToolGateway.call(...)es el límiteagent ≠ executor: el agente planifica y el gateway ejecuta de forma segura.loop_detecteddetecta exact-repeat (tool + args_hash).
llm.py — planning + final synthesis
El LLM ve solo el catálogo de tools disponibles; si un tool no está en allowlist, el gateway detiene el run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"steps": [
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
]
}
Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()
TOOL_CATALOG = [
{
"name": "get_manager_profile",
"description": "Get manager profile by manager_id",
"args": {"manager_id": "integer"},
},
{
"name": "fetch_sales_data",
"description": "Get daily gross sales for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "fetch_refund_data",
"description": "Get daily refund values for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "calculate_monthly_kpis",
"description": "Calculate gross/refunds/net/order KPIs for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "detect_risk_signals",
"description": "Detect risk warnings for a month",
"args": {"month": "string in YYYY-MM"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
payload = {
"goal": goal,
"max_plan_steps": max_plan_steps,
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Qué es lo más importante aquí (en palabras simples)
create_plan(...)es la decision stage para decomposition.timeout=LLM_TIMEOUT_SECONDS+LLMTimeoutdan una parada controlada ante problemas de red/modelo.- Una respuesta final vacía no se enmascara con texto fallback: se devuelve
llm_emptyexplícito. - Si el JSON está roto, se devuelve
{"kind":"invalid"...}y la policy layer devuelve unstop_reasonlegible.
main.py — Plan -> Execute -> Combine
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
calculate_monthly_kpis,
detect_risk_signals,
fetch_refund_data,
fetch_sales_data,
get_manager_profile,
)
GOAL = (
"Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
"Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)
# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)
TOOL_REGISTRY = {
"get_manager_profile": get_manager_profile,
"fetch_sales_data": fetch_sales_data,
"fetch_refund_data": fetch_refund_data,
"calculate_monthly_kpis": calculate_monthly_kpis,
"detect_risk_signals": detect_risk_signals,
}
ALLOWED_TOOLS = {
"get_manager_profile",
"fetch_sales_data",
"fetch_refund_data",
"calculate_monthly_kpis",
"detect_risk_signals",
}
def run_task_decomposition(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
try:
raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "plan",
"trace": trace,
"history": history,
}
try:
steps = validate_plan_action(
raw_plan,
max_plan_steps=BUDGET.max_plan_steps,
allowed_tools=ALLOWED_TOOLS,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if len(steps) > BUDGET.max_execute_steps:
return {
"status": "stopped",
"stop_reason": "max_execute_steps",
"plan": steps,
"trace": trace,
"history": history,
}
for step_no, step in enumerate(steps, start=1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"plan": steps,
"trace": trace,
"history": history,
}
tool_name = step["tool"]
tool_args = step["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"plan": steps,
"trace": trace,
"history": history,
}
history.append(
{
"step_no": step_no,
"plan_step": step,
"observation": observation,
}
)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": steps,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_task_decomposition(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Qué es lo más importante aquí (en palabras simples)
run_task_decomposition(...)controlaPlan -> Execute -> Combine; las acciones de negocio se ejecutan solo a través deToolGateway.- Si el plan es inválido, se devuelve
raw_planpara depuración. - En esta versión,
max_execute_stepsverifica la longitud del plan antes de execution; luego los límites runtime los danmax_tool_callsymax_seconds. historyes un log transparente de pasos: qué había en el plan y qué observation devolvió cada tool.
requirements.txt
openai==2.21.0
Ejemplo de salida
El orden de pasos del plan puede variar un poco entre ejecuciones, pero los policy gates y stop reasons se mantienen estables. El planner puede cambiar el orden de pasos; lo importante es que policy + allowlist funcionan igual sin importar el orden.
{
"status": "ok",
"stop_reason": "success",
"answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
"plan": [
{"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
{"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
{"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
{"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
{"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
],
"trace": [
{"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
{"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
{"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
{"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
{"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
],
"history": [{...}]
}
Este es un ejemplo resumido: en una ejecución real, plan y trace pueden contener más pasos.
history es el registro de ejecución: para cada step_no hay plan_step y observation.
args_hash hashea solo argumentos, por eso puede coincidir entre distintos tools si los args son iguales; loop detection además considera el nombre del tool.
stop_reason típicos
success— el plan se ejecutó y se generó la respuesta finalinvalid_plan:*— el plan del LLM no pasó la policy validationinvalid_plan:non_json— el LLM no devolvió un plan JSON válidoinvalid_plan:min_steps— el plan tiene menos de 3 pasos de decompositioninvalid_plan:tool_not_allowed:<name>— el plan contiene un tool fuera de allowlistmax_execute_steps— el plan es más largo que el execution budget permitidomax_tool_calls— se agotó el límite de llamadas a toolsmax_seconds— se superó el time budget del runllm_timeout— el LLM no respondió dentro deOPENAI_TIMEOUT_SECONDSllm_empty— el LLM devolvió una respuesta final vacía en la fasefinalizetool_denied:<name>— el tool no está en allowlisttool_missing:<name>— el tool no existe en registrytool_bad_args:<name>— el paso contiene argumentos inválidosloop_detected— exact repeat (tool + args_hash)
Qué NO se muestra aquí
- No hay auth/PII ni controles de acceso de producción para datos personales.
- No hay políticas de retry/backoff para LLM y la capa de herramientas.
- No hay presupuestos de tokens/costo (cost guardrails).
- Las herramientas aquí son mocks determinísticos para aprendizaje, no APIs externas reales.
Qué probar después
- Quita
detect_risk_signalsdeALLOWED_TOOLSy verificatool_denied:*. - Agrega al plan un tool inexistente y verifica
tool_missing:*. - Reduce
max_plan_stepsa3y observa con qué frecuencia obtienesinvalid_plan:max_steps. - Cambia
GOALamanager_id=7 (Max)y compara la síntesis final. - Agrega cost/token guardrails en
Budgety en el resultado JSON final.
Código completo en GitHub
En el repositorio está la versión runnable completa de este ejemplo: planning, policy boundary, ejecución secuencial de pasos y stop reasons.
Ver código completo en GitHub ↗