Task Decomposition Agent — Python (implementación completa con LLM)

Ejemplo ejecutable de agente Task Decomposition en Python con estilo de producción, con planificación, policy boundary, ejecución secuencial de pasos, presupuestos y stop reasons.
En esta página
  1. Esencia del patrón (breve)
  2. Qué demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Solución
  8. Código
  9. tools.py — herramientas (fuente de hechos)
  10. gateway.py — policy boundary (la capa más importante)
  11. llm.py — planning + final synthesis
  12. main.py — Plan -> Execute -> Combine
  13. requirements.txt
  14. Ejemplo de salida
  15. stop_reason típicos
  16. Qué NO se muestra aquí
  17. Qué probar después
  18. Código completo en GitHub

Esencia del patrón (breve)

Task Decomposition Agent es un patrón en el que el agente primero divide una tarea compleja en pasos secuenciales, y solo después los ejecuta uno por uno.

El modelo se encarga del plan y del orden de acciones, mientras que la ejecución de cada paso pasa por un gateway controlado con validación del plan, allowlist y presupuestos de run.


Qué demuestra este ejemplo

  • etapa Plan separada antes de Execute
  • policy boundary entre planning (LLM) y tools (execution layer)
  • validación estricta del plan (kind, estructura de pasos, allowed keys)
  • allowlist de herramientas (deny by default)
  • presupuestos de run separados: max_plan_steps (plan) y max_execute_steps (ejecución), más max_tool_calls, max_seconds
  • stop_reason explícitos para depuración y monitoreo
  • raw_plan en la respuesta si el plan es inválido

Arquitectura

  1. El LLM recibe el goal y devuelve un plan en JSON (kind="plan", steps).
  2. El policy boundary valida el plan y bloquea formas inválidas/no seguras.
  3. Cada paso se ejecuta de forma secuencial mediante ToolGateway (allowlist, budgets, loop detection).
  4. La observation de cada paso se agrega a history como checkpoint para ejecución transparente.
  5. Después de ejecutar todos los pasos, el LLM hace la síntesis final basada en history con una llamada Combine separada, sin tools.

El LLM devuelve intent (plan), tratado como entrada no confiable: el policy boundary primero lo valida y luego (si está permitido) llama tools. La allowlist se aplica dos veces: en plan validation (invalid_plan:tool_not_allowed:*) y en ejecución de tools (tool_denied:*).

Así Task Decomposition se mantiene controlable: el agente planifica y la ejecución pasa por una capa controlada.


Estructura del proyecto

TEXT
examples/
└── agent-patterns/
    └── task-decomposition-agent/
        └── python/
            ├── main.py           # Plan -> Execute -> Combine
            ├── llm.py            # planner + final synthesis
            ├── gateway.py        # policy boundary: plan validation + tool execution control
            ├── tools.py          # deterministic tools (Anna/Max, US, USD)
            └── requirements.txt

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si lo prefieres, python-dotenv para cargar .env automáticamente.


Tarea

Imagina que un manager pide:

"Prepara un informe corto de abril de 2026: ventas, devoluciones, ingresos netos y riesgos."

El agente no debe inventar la respuesta "de memoria". Debe:

  • primero crear un plan
  • ejecutar los pasos en orden
  • usar datos solo de tools permitidas
  • dar la respuesta final solo después de todos los pasos

Solución

Aquí el agente trabaja con un flujo simple:

  • el LLM primero crea un plan con varios pasos
  • el sistema verifica que el plan sea válido y permitido
  • las tools ejecutan pasos y devuelven hechos
  • después de eso, el LLM compone el resumen final corto
  • si un plan o paso es inválido, el run se detiene con una razón

Código

tools.py — herramientas (fuente de hechos)

PYTHON
from __future__ import annotations

from typing import Any

MANAGERS = {
    42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
    7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}

SALES_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
        {"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
        {"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
        {"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
        {"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
    ]
}

REFUND_DATA = {
    "2026-04": [
        {"day": "2026-04-01", "refunds_usd": 140.0},
        {"day": "2026-04-02", "refunds_usd": 260.0},
        {"day": "2026-04-03", "refunds_usd": 210.0},
        {"day": "2026-04-04", "refunds_usd": 590.0},
        {"day": "2026-04-05", "refunds_usd": 170.0},
    ]
}


def get_manager_profile(manager_id: int) -> dict[str, Any]:
    manager = MANAGERS.get(manager_id)
    if not manager:
        return {"error": f"manager {manager_id} not found"}
    return {"manager": manager}


def fetch_sales_data(month: str) -> dict[str, Any]:
    rows = SALES_DATA.get(month)
    if not rows:
        return {"error": f"sales data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_sales": rows}


def fetch_refund_data(month: str) -> dict[str, Any]:
    rows = REFUND_DATA.get(month)
    if not rows:
        return {"error": f"refund data for {month} not found"}
    return {"month": month, "currency": "USD", "daily_refunds": rows}


def calculate_monthly_kpis(month: str) -> dict[str, Any]:
    sales_rows = SALES_DATA.get(month)
    refund_rows = REFUND_DATA.get(month)
    if not sales_rows or not refund_rows:
        return {"error": f"kpi inputs for {month} not found"}

    gross_sales = sum(row["gross_usd"] for row in sales_rows)
    total_refunds = sum(row["refunds_usd"] for row in refund_rows)
    total_orders = sum(row["orders"] for row in sales_rows)
    net_sales = gross_sales - total_refunds
    refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0

    top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]

    return {
        "month": month,
        "currency": "USD",
        "gross_sales_usd": round(gross_sales, 2),
        "refunds_usd": round(total_refunds, 2),
        "net_sales_usd": round(net_sales, 2),
        "orders": total_orders,
        "refund_rate": round(refund_rate, 4),
        "top_sales_day": top_day,
    }


def detect_risk_signals(month: str) -> dict[str, Any]:
    refund_rows = REFUND_DATA.get(month)
    if not refund_rows:
        return {"error": f"refund data for {month} not found"}

    high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
    warnings: list[str] = []

    if high_refund_day["refunds_usd"] >= 500:
        warnings.append(
            f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
        )

    if not warnings:
        warnings.append("No critical risk signals detected for this month.")

    return {
        "month": month,
        "currency": "USD",
        "risk_warnings": warnings,
        "peak_refund_day": high_refund_day,
    }

Qué es lo más importante aquí (en palabras simples)

  • Las tools son determinísticas y no contienen lógica de LLM.
  • El agente solo decide qué pasos ejecutar.
  • La lógica de negocio la ejecuta la execution layer (tools), no el LLM.

gateway.py — policy boundary (la capa más importante)

PYTHON
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_plan_steps: int = 6
    max_execute_steps: int = 8
    max_tool_calls: int = 8
    max_seconds: int = 60


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(item) for item in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(
                json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
            )
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def args_hash(args: dict[str, Any]) -> str:
    raw = _stable_json(args or {})
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def validate_plan_action(
    action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
    if not isinstance(action, dict):
        raise StopRun("invalid_plan:not_object")

    kind = action.get("kind")
    if kind == "invalid":
        raise StopRun("invalid_plan:non_json")
    if kind != "plan":
        raise StopRun("invalid_plan:bad_kind")

    allowed_top_keys = {"kind", "steps"}
    if set(action.keys()) - allowed_top_keys:
        raise StopRun("invalid_plan:extra_keys")

    steps = action.get("steps")
    if not isinstance(steps, list) or not steps:
        raise StopRun("invalid_plan:missing_steps")
    if len(steps) < 3:
        raise StopRun("invalid_plan:min_steps")
    if len(steps) > max_plan_steps:
        raise StopRun("invalid_plan:max_steps")

    normalized: list[dict[str, Any]] = []
    seen_ids: set[str] = set()

    for index, step in enumerate(steps, start=1):
        if not isinstance(step, dict):
            raise StopRun(f"invalid_plan:step_{index}_not_object")

        allowed_step_keys = {"id", "title", "tool", "args"}
        if set(step.keys()) - allowed_step_keys:
            raise StopRun(f"invalid_plan:step_{index}_extra_keys")

        step_id = step.get("id")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_id")
        if step_id in seen_ids:
            raise StopRun("invalid_plan:duplicate_step_id")
        seen_ids.add(step_id)

        title = step.get("title")
        if not isinstance(title, str) or not title.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_title")

        tool = step.get("tool")
        if not isinstance(tool, str) or not tool.strip():
            raise StopRun(f"invalid_plan:step_{index}_missing_tool")
        tool = tool.strip()
        if tool not in allowed_tools:
            raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")

        args = step.get("args", {})
        if args is None:
            args = {}
        if not isinstance(args, dict):
            raise StopRun(f"invalid_plan:step_{index}_bad_args")

        normalized.append(
            {
                "id": step_id.strip(),
                "title": title.strip(),
                "tool": tool,
                "args": args,
            }
        )

    return normalized


class ToolGateway:
    def __init__(
        self,
        *,
        allow: set[str],
        registry: dict[str, Callable[..., dict[str, Any]]],
        budget: Budget,
    ):
        self.allow = set(allow)
        self.registry = registry
        self.budget = budget
        self.tool_calls = 0
        self.seen_calls: set[str] = set()

    def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
        self.tool_calls += 1
        if self.tool_calls > self.budget.max_tool_calls:
            raise StopRun("max_tool_calls")

        if name not in self.allow:
            raise StopRun(f"tool_denied:{name}")

        tool = self.registry.get(name)
        if tool is None:
            raise StopRun(f"tool_missing:{name}")

        signature = f"{name}:{args_hash(args)}"
        if signature in self.seen_calls:
            raise StopRun("loop_detected")
        self.seen_calls.add(signature)

        try:
            return tool(**args)
        except TypeError as exc:
            raise StopRun(f"tool_bad_args:{name}") from exc
        except Exception as exc:
            raise StopRun(f"tool_error:{name}") from exc

Qué es lo más importante aquí (en palabras simples)

  • validate_plan_action(...) es la governance/control layer para el plan del LLM.
  • El plan se trata como input no confiable y pasa por validación estricta.
  • ToolGateway.call(...) es el límite agent ≠ executor: el agente planifica y el gateway ejecuta de forma segura.
  • loop_detected detecta exact-repeat (tool + args_hash).

llm.py — planning + final synthesis

El LLM ve solo el catálogo de tools disponibles; si un tool no está en allowlist, el gateway detiene el run.

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
  "kind": "plan",
  "steps": [
    {"id": "step_1", "title": "...", "tool": "...", "args": {...}}
  ]
}

Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()

FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()

TOOL_CATALOG = [
    {
        "name": "get_manager_profile",
        "description": "Get manager profile by manager_id",
        "args": {"manager_id": "integer"},
    },
    {
        "name": "fetch_sales_data",
        "description": "Get daily gross sales for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "fetch_refund_data",
        "description": "Get daily refund values for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "calculate_monthly_kpis",
        "description": "Calculate gross/refunds/net/order KPIs for a month",
        "args": {"month": "string in YYYY-MM"},
    },
    {
        "name": "detect_risk_signals",
        "description": "Detect risk warnings for a month",
        "args": {"month": "string in YYYY-MM"},
    },
]


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "max_plan_steps": max_plan_steps,
        "available_tools": TOOL_CATALOG,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": PLAN_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
    payload = {
        "goal": goal,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or ""
    text = text.strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Qué es lo más importante aquí (en palabras simples)

  • create_plan(...) es la decision stage para decomposition.
  • timeout=LLM_TIMEOUT_SECONDS + LLMTimeout dan una parada controlada ante problemas de red/modelo.
  • Una respuesta final vacía no se enmascara con texto fallback: se devuelve llm_empty explícito.
  • Si el JSON está roto, se devuelve {"kind":"invalid"...} y la policy layer devuelve un stop_reason legible.

main.py — Plan -> Execute -> Combine

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
    calculate_monthly_kpis,
    detect_risk_signals,
    fetch_refund_data,
    fetch_sales_data,
    get_manager_profile,
)

GOAL = (
    "Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
    "Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)

# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)

TOOL_REGISTRY = {
    "get_manager_profile": get_manager_profile,
    "fetch_sales_data": fetch_sales_data,
    "fetch_refund_data": fetch_refund_data,
    "calculate_monthly_kpis": calculate_monthly_kpis,
    "detect_risk_signals": detect_risk_signals,
}

ALLOWED_TOOLS = {
    "get_manager_profile",
    "fetch_sales_data",
    "fetch_refund_data",
    "calculate_monthly_kpis",
    "detect_risk_signals",
}


def run_task_decomposition(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)

    try:
        raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        steps = validate_plan_action(
            raw_plan,
            max_plan_steps=BUDGET.max_plan_steps,
            allowed_tools=ALLOWED_TOOLS,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "raw_plan": raw_plan,
            "trace": trace,
            "history": history,
        }

    if len(steps) > BUDGET.max_execute_steps:
        return {
            "status": "stopped",
            "stop_reason": "max_execute_steps",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    for step_no, step in enumerate(steps, start=1):
        elapsed = time.monotonic() - started
        if elapsed > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        tool_name = step["tool"]
        tool_args = step["args"]

        try:
            observation = gateway.call(tool_name, tool_args)
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": True,
                }
            )
        except StopRun as exc:
            trace.append(
                {
                    "step_no": step_no,
                    "step_id": step["id"],
                    "tool": tool_name,
                    "args_hash": args_hash(tool_args),
                    "ok": False,
                    "stop_reason": exc.reason,
                }
            )
            return {
                "status": "stopped",
                "stop_reason": exc.reason,
                "plan": steps,
                "trace": trace,
                "history": history,
            }

        history.append(
            {
                "step_no": step_no,
                "plan_step": step,
                "observation": observation,
            }
        )

    try:
        answer = compose_final_answer(goal=goal, history=history)
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "llm_phase": "finalize",
            "plan": steps,
            "trace": trace,
            "history": history,
        }

    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "plan": steps,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_task_decomposition(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Qué es lo más importante aquí (en palabras simples)

  • run_task_decomposition(...) controla Plan -> Execute -> Combine; las acciones de negocio se ejecutan solo a través de ToolGateway.
  • Si el plan es inválido, se devuelve raw_plan para depuración.
  • En esta versión, max_execute_steps verifica la longitud del plan antes de execution; luego los límites runtime los dan max_tool_calls y max_seconds.
  • history es un log transparente de pasos: qué había en el plan y qué observation devolvió cada tool.

requirements.txt

TEXT
openai==2.21.0

Ejemplo de salida

El orden de pasos del plan puede variar un poco entre ejecuciones, pero los policy gates y stop reasons se mantienen estables. El planner puede cambiar el orden de pasos; lo importante es que policy + allowlist funcionan igual sin importar el orden.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
  "plan": [
    {"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
    {"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
    {"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
    {"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
    {"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
  ],
  "trace": [
    {"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
    {"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
    {"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
    {"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
    {"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
  ],
  "history": [{...}]
}

Este es un ejemplo resumido: en una ejecución real, plan y trace pueden contener más pasos.

history es el registro de ejecución: para cada step_no hay plan_step y observation.

args_hash hashea solo argumentos, por eso puede coincidir entre distintos tools si los args son iguales; loop detection además considera el nombre del tool.


stop_reason típicos

  • success — el plan se ejecutó y se generó la respuesta final
  • invalid_plan:* — el plan del LLM no pasó la policy validation
  • invalid_plan:non_json — el LLM no devolvió un plan JSON válido
  • invalid_plan:min_steps — el plan tiene menos de 3 pasos de decomposition
  • invalid_plan:tool_not_allowed:<name> — el plan contiene un tool fuera de allowlist
  • max_execute_steps — el plan es más largo que el execution budget permitido
  • max_tool_calls — se agotó el límite de llamadas a tools
  • max_seconds — se superó el time budget del run
  • llm_timeout — el LLM no respondió dentro de OPENAI_TIMEOUT_SECONDS
  • llm_empty — el LLM devolvió una respuesta final vacía en la fase finalize
  • tool_denied:<name> — el tool no está en allowlist
  • tool_missing:<name> — el tool no existe en registry
  • tool_bad_args:<name> — el paso contiene argumentos inválidos
  • loop_detected — exact repeat (tool + args_hash)

Qué NO se muestra aquí

  • No hay auth/PII ni controles de acceso de producción para datos personales.
  • No hay políticas de retry/backoff para LLM y la capa de herramientas.
  • No hay presupuestos de tokens/costo (cost guardrails).
  • Las herramientas aquí son mocks determinísticos para aprendizaje, no APIs externas reales.

Qué probar después

  • Quita detect_risk_signals de ALLOWED_TOOLS y verifica tool_denied:*.
  • Agrega al plan un tool inexistente y verifica tool_missing:*.
  • Reduce max_plan_steps a 3 y observa con qué frecuencia obtienes invalid_plan:max_steps.
  • Cambia GOAL a manager_id=7 (Max) y compara la síntesis final.
  • Agrega cost/token guardrails en Budget y en el resultado JSON final.

Código completo en GitHub

En el repositorio está la versión runnable completa de este ejemplo: planning, policy boundary, ejecución secuencial de pasos y stop reasons.

Ver código completo en GitHub ↗
⏱️ 15 min de lecturaActualizado Mar, 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.
Autor

Esta documentación está curada y mantenida por ingenieros que despliegan agentes de IA en producción.

El contenido es asistido por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

Los patrones y las recomendaciones se basan en post-mortems, modos de fallo e incidentes operativos en sistemas desplegados, incluido durante el desarrollo y la operación de infraestructura de gobernanza para agentes en OnceOnly.