Guarded-Policy Agent en Python: Ejemplo completo

Ejemplo runnable con estilo de produccion de agente Guarded-Policy en Python con policy-gate allow/deny/rewrite/escalate y trace/history transparentes.
En esta página
  1. Esencia del Patron (breve)
  2. Que Demuestra Este Ejemplo
  3. Arquitectura
  4. Estructura del Proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Codigo
  8. context.py — request envelope
  9. tools.py - herramientas ejecutables
  10. agent.py — proposed actions + final answer
  11. gateway.py — policy boundary + execution boundary
  12. main.py — orchestrate policy-gated execution
  13. Ejemplo de Salida
  14. stop_reason Tipicos
  15. Lo Que NO Se Muestra Aqui
  16. Que Probar Despues

Esencia del Patron (breve)

Guarded-Policy Agent significa: el agente puede proponer cualquier accion, pero la ejecucion pasa solo por el policy boundary.

Antes de cada tool-call, el policy-gate devuelve una de las decisiones:

  • allow
  • deny
  • rewrite
  • escalate

Que Demuestra Este Ejemplo

  • la capa LLM/agent propone un plan de acciones (actions)
  • la execution layer valida el contrato de accion
  • el policy boundary aplica allow/deny/rewrite/escalate
  • runtime execution allowlist existe como guard-path separado (no se activa en este run; se valida en un experimento aparte)
  • escalate: la policy crea una accion segura y human approval confirma la ejecucion
  • la respuesta final se construye solo con observations obtenidas de forma segura

Arquitectura

  1. agent.py construye un plan de cuatro acciones.
  2. gateway.py valida cada accion y toma la policy decision.
  3. a2 se bloquea como deny (pii_export_blocked) y no se ejecuta.
  4. a3 va a escalate: la policy crea una variante segura, human approval confirma el scope y luego se ejecuta como human_approved.
  5. a4 demuestra rewrite (template/recipient cap) y se ejecuta como policy_rewrite.
  6. main.py recopila trace/history y devuelve el brief final.

Estructura del Proyecto

TEXT
agent-patterns/
└── guarded-policy-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/guarded-policy-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv.


Tarea

Caso de produccion:

"Prepara una actualizacion customer-safe sobre un incidente de pago P1 para US. Bloquea PII export y limita la comunicacion masiva a nivel de policy."


Codigo

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
        },
        "policy_hints": {
            "status_update_template": "incident_p1_v2",
            "max_recipients_per_send": 50000,
        },
    }

tools.py - herramientas ejecutables

PYTHON
from __future__ import annotations

import time
from typing import Any


def fetch_incident_snapshot(report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    time.sleep(0.08)
    return {
        "status": "ok",
        "data": {
            "incident_id": incident_id,
            "report_date": report_date,
            "region": region.upper(),
            "severity": "P1",
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "affected_checkout_share": 0.27,
            "eta_minutes": 45,
        },
    }


def send_status_update(
    channel: str,
    template_id: str,
    audience_segment: str,
    max_recipients: int,
) -> dict[str, Any]:
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "channel": channel,
            "template_id": template_id,
            "audience_segment": audience_segment,
            "queued_recipients": int(max_recipients),
            "delivery_id": "upd_20260306_001",
        },
    }


def export_customer_data(fields: list[str], destination: str) -> dict[str, Any]:
    del fields, destination
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "export_id": "exp_20260306_001",
            "rows": 18240,
        },
    }


def create_manual_review_ticket(reason: str, payload: dict[str, Any]) -> dict[str, Any]:
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "ticket_id": "pol_20260306_001",
            "reason": reason,
            "payload_keys": sorted(payload.keys()),
        },
    }

agent.py — proposed actions + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_action_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal
    return {
        "actions": [
            {
                "id": "a1",
                "tool": "fetch_incident_snapshot",
                "args": {
                    "report_date": req["report_date"],
                    "region": req["region"],
                    "incident_id": req["incident_id"],
                },
            },
            {
                "id": "a2",
                "tool": "export_customer_data",
                "args": {
                    "fields": ["email", "country", "payment_last4"],
                    "destination": "external_s3",
                },
            },
            {
                "id": "a3",
                "tool": "send_status_update",
                "args": {
                    "channel": "external_email",
                    "template_id": "free_text_v0",
                    "audience_segment": "all_customers",
                    "max_recipients": 120000,
                    "free_text": "We are fully recovered.",
                },
            },
            {
                "id": "a4",
                "tool": "send_status_update",
                "args": {
                    "channel": "status_page",
                    "template_id": "free_text_v0",
                    "audience_segment": "enterprise_active",
                    "max_recipients": 120000,
                },
            },
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    state: dict[str, Any],
    policy_summary: dict[str, Any],
) -> str:
    req = request["request"]
    snap = state.get("snapshot") or {}
    delivery = state.get("delivery") or {}

    blocked = ", ".join(policy_summary.get("denied_tools") or []) or "none"
    sent = ""
    if delivery:
        sent = (
            f" Status update queued via {delivery.get('channel')} for {delivery.get('audience_segment')} "
            f"using template {delivery.get('template_id')} to {delivery.get('queued_recipients')} recipients."
        )

    failed_rate = snap.get("failed_payment_rate")
    failed_rate_str = f"{float(failed_rate) * 100:.1f}%" if isinstance(failed_rate, (int, float)) else "?"
    share = snap.get("affected_checkout_share")
    share_str = f"{float(share) * 100:.0f}%" if isinstance(share, (int, float)) else "?"

    return (
        f"Operations brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} remains "
        f"{snap.get('severity', '?')} with failed payments at {failed_rate_str} and "
        f"{snap.get('chargeback_alerts', '?')} chargeback alerts. Affected checkout share is "
        f"{share_str} and ETA is ~{snap.get('eta_minutes', '?')} minutes "
        f"(estimate, subject to change).{sent} Blocked by policy: {blocked}."
    )

gateway.py — policy boundary + execution boundary

PYTHON
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_actions: int = 8
    action_timeout_seconds: float = 1.2
    max_recipients_per_send: int = 50000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str
    enforced_action: dict[str, Any] | None = None


def _normalize_action(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    tool = raw.get("tool")
    args = raw.get("args")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(tool, str) or not tool.strip():
        raise StopRun("invalid_action:tool")
    if not isinstance(args, dict):
        raise StopRun("invalid_action:args")

    return {
        "id": action_id.strip(),
        "tool": tool.strip(),
        "args": dict(args),
    }


def validate_plan(raw_actions: Any, *, max_actions: int) -> list[dict[str, Any]]:
    if not isinstance(raw_actions, list) or not raw_actions:
        raise StopRun("invalid_plan:actions")
    if len(raw_actions) > max_actions:
        raise StopRun("invalid_plan:too_many_actions")
    return [_normalize_action(item) for item in raw_actions]


def validate_tool_observation(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    if raw.get("status") != "ok":
        raise StopRun(f"tool_status_not_ok:{tool_name}")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return data


class PolicyGateway:
    def __init__(
        self,
        *,
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self.allowed_templates = {"incident_p1_v2", "incident_p2_v1"}
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def evaluate(self, *, action: dict[str, Any], state: dict[str, Any]) -> Decision:
        del state
        normalized = _normalize_action(action)
        tool = normalized["tool"]
        args = dict(normalized["args"])

        if tool not in self.allowed_tools_policy:
            return Decision(kind="deny", reason="tool_denied_policy")
        if tool == "export_customer_data":
            return Decision(kind="deny", reason="pii_export_blocked")
        if tool not in self.allowed_tools_execution:
            return Decision(kind="deny", reason="tool_denied_execution")

        if tool != "send_status_update":
            return Decision(kind="allow", reason="policy_pass")

        rewrite_reasons: list[str] = []
        rewritten = dict(args)

        if rewritten.get("template_id") not in self.allowed_templates:
            rewritten["template_id"] = "incident_p1_v2"
            rewrite_reasons.append("template_allowlist")

        raw_recipients = rewritten.get("max_recipients", self.budget.max_recipients_per_send)
        try:
            recipients = int(raw_recipients)
        except (TypeError, ValueError):
            recipients = self.budget.max_recipients_per_send
        if recipients > self.budget.max_recipients_per_send:
            recipients = self.budget.max_recipients_per_send
            rewrite_reasons.append("recipient_cap")
        rewritten["max_recipients"] = recipients

        if "free_text" in rewritten:
            rewritten.pop("free_text", None)
            rewrite_reasons.append("free_text_removed")

        if (
            rewritten.get("channel") == "external_email"
            and rewritten.get("audience_segment") == "all_customers"
        ):
            rewritten["channel"] = "status_page"
            rewritten["audience_segment"] = "enterprise_active"
            enforced = {
                "id": normalized["id"],
                "tool": normalized["tool"],
                "args": rewritten,
            }
            return Decision(
                kind="escalate",
                reason="mass_external_broadcast",
                enforced_action=enforced,
            )

        if not rewrite_reasons:
            return Decision(kind="allow", reason="policy_pass")

        enforced = {
            "id": normalized["id"],
            "tool": normalized["tool"],
            "args": rewritten,
        }
        return Decision(
            kind="rewrite",
            reason=f"policy_rewrite:{','.join(rewrite_reasons)}",
            enforced_action=enforced,
        )

    def dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.action_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise StopRun(f"tool_timeout:{tool_name}") from exc
        except Exception as exc:  # noqa: BLE001
            raise StopRun(f"tool_error:{tool_name}:{type(exc).__name__}") from exc

        return validate_tool_observation(raw, tool_name=tool_name)

main.py — orchestrate policy-gated execution

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_action_plan
from context import build_request
from gateway import Budget, PolicyGateway, StopRun, validate_plan
from tools import (
    create_manual_review_ticket,
    export_customer_data,
    fetch_incident_snapshot,
    send_status_update,
)

GOAL = (
    "Prepare a customer-safe operations update for a US payments incident. "
    "Use policy-gated execution and never expose customer PII."
)
REQUEST = build_request(
    report_date="2026-03-06",
    region="US",
    incident_id="inc_payments_20260306",
)

BUDGET = Budget(
    max_seconds=25,
    max_actions=8,
    action_timeout_seconds=1.2,
    max_recipients_per_send=50000,
)

ALLOWED_TOOLS_POLICY = {
    "fetch_incident_snapshot",
    "send_status_update",
    "export_customer_data",
    "create_manual_review_ticket",
}
ALLOWED_TOOLS_EXECUTION = {
    "fetch_incident_snapshot",
    "send_status_update",
    "create_manual_review_ticket",
}

TOOLS: dict[str, Any] = {
    "fetch_incident_snapshot": fetch_incident_snapshot,
    "send_status_update": send_status_update,
    "export_customer_data": export_customer_data,
    "create_manual_review_ticket": create_manual_review_ticket,
}


def simulate_human_approval(*, action: dict[str, Any], reason: str) -> dict[str, Any]:
    # Demo policy: approve only the broadcast-risk escalation after safe rewrite.
    if reason == "mass_external_broadcast":
        return {"approved": True, "action": action, "reason": "approved_with_safe_scope"}
    return {"approved": False, "action": action, "reason": "human_rejected"}


def run_guarded_policy_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []
    state: dict[str, Any] = {"snapshot": None, "delivery": None}

    gateway = PolicyGateway(
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    decision_counts = {"allow": 0, "rewrite": 0, "deny": 0, "escalate": 0}
    denied_tools: list[str] = []
    rewritten_tools: list[str] = []
    escalated_tools: list[str] = []

    phase = "plan"

    try:
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="plan")

        raw_plan = propose_action_plan(goal=goal, request=request)
        actions = validate_plan(raw_plan.get("actions"), max_actions=BUDGET.max_actions)
        phase = "execute"

        for idx, action in enumerate(actions, start=1):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="execute")

            decision = gateway.evaluate(action=action, state=state)
            decision_counts[decision.kind] += 1

            trace_item = {
                "step": idx,
                "action_id": action["id"],
                "tool": action["tool"],
                "policy_decision": decision.kind,
                "policy_reason": decision.reason,
                "executed_from": "none",
                "ok": False,
            }
            history_item: dict[str, Any] = {
                "step": idx,
                "proposed_action": action,
                "policy": {"decision": decision.kind, "reason": decision.reason},
            }

            if decision.kind == "deny":
                denied_tools.append(action["tool"])
                trace.append(trace_item)
                history.append(history_item)
                continue

            executed_action = action
            executed_from = "original"
            if decision.kind == "rewrite":
                executed_action = decision.enforced_action or action
                rewritten_tools.append(action["tool"])
                executed_from = "policy_rewrite"
            elif decision.kind == "escalate":
                escalated_tools.append(action["tool"])
                escalation_ticket = gateway.dispatch(
                    tool_name="create_manual_review_ticket",
                    tool_fn=TOOLS["create_manual_review_ticket"],
                    args={"reason": decision.reason, "payload": (decision.enforced_action or action)},
                )
                history_item["escalation_ticket"] = escalation_ticket
                human = simulate_human_approval(
                    action=(decision.enforced_action or action),
                    reason=decision.reason,
                )
                if not human["approved"]:
                    return stopped("policy_escalation_rejected", phase="execute")
                executed_action = human["action"]
                executed_from = "human_approved"
                history_item["human"] = {
                    "approved": True,
                    "reason": human["reason"],
                }

            tool_name = executed_action["tool"]
            tool_fn = TOOLS.get(tool_name)
            if tool_fn is None:
                return stopped(f"tool_unmapped:{tool_name}", phase="execute")

            # Keep the decision in trace even if execution fails later.
            trace.append(trace_item)
            observation = gateway.dispatch(
                tool_name=tool_name,
                tool_fn=tool_fn,
                args=executed_action["args"],
            )
            if tool_name == "fetch_incident_snapshot":
                state["snapshot"] = observation
            elif tool_name == "send_status_update":
                state["delivery"] = observation

            trace_item["executed_from"] = executed_from
            trace_item["ok"] = True

            history_item["executed_action"] = executed_action
            history_item["executed_from"] = executed_from
            history_item["observation"] = observation
            history.append(history_item)

        if not isinstance(state["snapshot"], dict):
            return stopped("missing_required_observation:snapshot", phase="finalize")
        if not isinstance(state["delivery"], dict):
            return stopped("missing_required_observation:send_status_update", phase="finalize")

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident": state["snapshot"],
            "delivery": state["delivery"],
        }
        policy_summary = {
            "decisions": decision_counts,
            "denied_tools": sorted(set(denied_tools)),
            "rewritten_tools": sorted(set(rewritten_tools)),
            "escalated_tools": sorted(set(escalated_tools)),
        }
        answer = compose_final_answer(
            request=request,
            state=state,
            policy_summary=policy_summary,
        )

        trace.append(
            {
                "step": len(actions) + 1,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append({"step": len(actions) + 1, "action": "finalize"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "policy_guarded_success",
            "answer": answer,
            "proposed_plan": actions,
            "executed_plan": [
                step["executed_action"]
                for step in history
                if isinstance(step, dict) and isinstance(step.get("executed_action"), dict)
            ],
            "aggregate": aggregate,
            "policy_summary": policy_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)
    finally:
        gateway.close()


def main() -> None:
    result = run_guarded_policy_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo mas importante aqui (en palabras simples)

  • el agente propone acciones, pero la ejecucion no tiene acceso directo a tools
  • pii_export_blocked muestra un deny a nivel policy para exportacion riesgosa
  • tool_denied_execution sigue disponible como runtime deny-path, pero no se activa en este run
  • escalate: la policy crea una version segura de la accion, human approval confirma el scope y luego se ejecuta como human_approved
  • trace/history hacen auditables las decisiones de policy
  • gateway.close() se llama en finally para no dejar el pool abierto

Ejemplo de Salida

JSON
{
  "run_id": "62537585-a465-4b35-bbd8-40153df722eb",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "policy_guarded_success",
  "answer": "Operations brief (US, 2026-03-06): incident inc_payments_20260306 remains P1 with failed payments at 3.4% and 5 chargeback alerts. Affected checkout share is 27% and ETA is ~45 minutes (estimate, subject to change). Status update queued via status_page for enterprise_active using template incident_p1_v2 to 50000 recipients. Blocked by policy: export_customer_data.",
  "proposed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a2",
      "tool": "export_customer_data",
      "args": {
        "fields": ["email", "country", "payment_last4"],
        "destination": "external_s3"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "external_email",
        "template_id": "free_text_v0",
        "audience_segment": "all_customers",
        "max_recipients": 120000,
        "free_text": "We are fully recovered."
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "free_text_v0",
        "audience_segment": "enterprise_active",
        "max_recipients": 120000
      }
    }
  ],
  "executed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    }
  ],
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "incident": {
      "incident_id": "inc_payments_20260306",
      "report_date": "2026-03-06",
      "region": "US",
      "severity": "P1",
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "affected_checkout_share": 0.27,
      "eta_minutes": 45
    },
    "delivery": {
      "channel": "status_page",
      "template_id": "incident_p1_v2",
      "audience_segment": "enterprise_active",
      "queued_recipients": 50000,
      "delivery_id": "upd_20260306_001"
    }
  },
  "policy_summary": {
    "decisions": {
      "allow": 1,
      "rewrite": 1,
      "deny": 1,
      "escalate": 1
    },
    "denied_tools": [
      "export_customer_data"
    ],
    "rewritten_tools": [
      "send_status_update"
    ],
    "escalated_tools": [
      "send_status_update"
    ]
  },
  "trace": [
    {
      "step": 1,
      "action_id": "a1",
      "tool": "fetch_incident_snapshot",
      "policy_decision": "allow",
      "policy_reason": "policy_pass",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "action_id": "a2",
      "tool": "export_customer_data",
      "policy_decision": "deny",
      "policy_reason": "pii_export_blocked",
      "executed_from": "none",
      "ok": false
    },
    {
      "step": 3,
      "action_id": "a3",
      "tool": "send_status_update",
      "policy_decision": "escalate",
      "policy_reason": "mass_external_broadcast",
      "executed_from": "human_approved",
      "ok": true
    },
    {
      "step": 4,
      "action_id": "a4",
      "tool": "send_status_update",
      "policy_decision": "rewrite",
      "policy_reason": "policy_rewrite:template_allowlist,recipient_cap",
      "executed_from": "policy_rewrite",
      "ok": true
    },
    {
      "step": 5,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

trace no incluye diffs de parametros; mira los args seguros en history[*].executed_action.args.


stop_reason Tipicos

  • success - el run termino correctamente
  • max_seconds - se agoto el time budget total
  • invalid_plan:* - plan de acciones invalido
  • invalid_action:* - contrato de accion invalido
  • tool_timeout:* - el tool no respondio a tiempo
  • tool_error:* - excepcion durante tool execution
  • tool_status_not_ok:* - el tool devolvio status != "ok"
  • tool_invalid_output:* - el tool devolvio un contrato invalido
  • tool_unmapped:* - tool no encontrado en el dispatch map
  • policy_escalation_rejected - human approval rechazo la escalacion
  • missing_required_observation:* - faltan datos criticos para finalizar

Lo Que NO Se Muestra Aqui

  • persisted policy/event logs (DB/SIEM)
  • role/tenant-scoped policy packs
  • dynamic risk scoring (beyond static rules)
  • async approval queue (Jira/ServiceNow/Slack workflow)

Que Probar Despues

  1. Quita send_status_update de ALLOWED_TOOLS_EXECUTION y observa: policy_decision=deny (tool_denied_execution) -> stop_reason=missing_required_observation:send_status_update final.
  2. Agrega una segunda policy de escalacion (por ejemplo, para eta_minutes > 120) y revisa trace.
  3. Divide los policy reasons en codigos estructurados (policy.mass_broadcast, policy.pii_export) para analitica mas simple.
⏱️ 13 min de lecturaActualizado 6 de marzo de 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.

Autor

Nick — ingeniero que construye infraestructura para agentes de IA en producción.

Enfoque: patrones de agentes, modos de fallo, control del runtime y fiabilidad del sistema.

🔗 GitHub: https://github.com/mykolademyanov


Nota editorial

Esta documentación está asistida por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

El contenido se basa en fallos reales, post-mortems e incidentes operativos en sistemas de agentes de IA desplegados.