Agent Guarded-Policy en Python : Exemple complet

Exemple runnable style production d agent Guarded-Policy en Python avec policy-gate allow/deny/rewrite/escalate et trace/history transparents.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce Que Cet Exemple Demontre
  3. Architecture
  4. Structure du Projet
  5. Lancer le projet
  6. Tache
  7. Code
  8. context.py — request envelope
  9. tools.py - outils executables
  10. agent.py — proposed actions + final answer
  11. gateway.py — policy boundary + execution boundary
  12. main.py — orchestrate policy-gated execution
  13. Exemple de Sortie
  14. stop_reason Typiques
  15. Ce Qui N est PAS Montre Ici
  16. Ce Qu il Faut Essayer Ensuite

Essence du pattern (bref)

Guarded-Policy Agent signifie : l agent peut proposer n importe quelle action, mais l execution passe uniquement par la frontiere policy.

Avant chaque tool-call, le policy-gate renvoie une des decisions :

  • allow
  • deny
  • rewrite
  • escalate

Ce Que Cet Exemple Demontre

  • la couche LLM/agent propose un plan d actions (actions)
  • l execution layer valide le contrat d action
  • la frontiere policy applique allow/deny/rewrite/escalate
  • runtime execution allowlist existe comme guard-path separe (non declenche dans ce run ; verifie dans une experience separee)
  • escalate : la policy construit une action sure, human approval confirme l execution
  • la reponse finale est composee uniquement a partir d observations obtenues de facon sure

Architecture

  1. agent.py construit un plan de quatre actions.
  2. gateway.py valide chaque action et prend la policy decision.
  3. a2 est bloque en deny (pii_export_blocked) et n est pas execute.
  4. a3 passe en escalate : la policy construit une variante sure, human approval confirme le scope, puis execution en human_approved.
  5. a4 demontre rewrite (template/recipient cap) et est execute en policy_rewrite.
  6. main.py collecte trace/history et renvoie le brief final.

Structure du Projet

TEXT
agent-patterns/
└── guarded-policy-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/guarded-policy-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv.


Tache

Cas production :

"Prepare une mise a jour customer-safe sur un incident de paiement P1 pour US. Bloque PII export et limite la communication de masse au niveau policy."


Code

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
        },
        "policy_hints": {
            "status_update_template": "incident_p1_v2",
            "max_recipients_per_send": 50000,
        },
    }

tools.py - outils executables

PYTHON
from __future__ import annotations

import time
from typing import Any


def fetch_incident_snapshot(report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    time.sleep(0.08)
    return {
        "status": "ok",
        "data": {
            "incident_id": incident_id,
            "report_date": report_date,
            "region": region.upper(),
            "severity": "P1",
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "affected_checkout_share": 0.27,
            "eta_minutes": 45,
        },
    }


def send_status_update(
    channel: str,
    template_id: str,
    audience_segment: str,
    max_recipients: int,
) -> dict[str, Any]:
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "channel": channel,
            "template_id": template_id,
            "audience_segment": audience_segment,
            "queued_recipients": int(max_recipients),
            "delivery_id": "upd_20260306_001",
        },
    }


def export_customer_data(fields: list[str], destination: str) -> dict[str, Any]:
    del fields, destination
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "export_id": "exp_20260306_001",
            "rows": 18240,
        },
    }


def create_manual_review_ticket(reason: str, payload: dict[str, Any]) -> dict[str, Any]:
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "ticket_id": "pol_20260306_001",
            "reason": reason,
            "payload_keys": sorted(payload.keys()),
        },
    }

agent.py — proposed actions + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_action_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal
    return {
        "actions": [
            {
                "id": "a1",
                "tool": "fetch_incident_snapshot",
                "args": {
                    "report_date": req["report_date"],
                    "region": req["region"],
                    "incident_id": req["incident_id"],
                },
            },
            {
                "id": "a2",
                "tool": "export_customer_data",
                "args": {
                    "fields": ["email", "country", "payment_last4"],
                    "destination": "external_s3",
                },
            },
            {
                "id": "a3",
                "tool": "send_status_update",
                "args": {
                    "channel": "external_email",
                    "template_id": "free_text_v0",
                    "audience_segment": "all_customers",
                    "max_recipients": 120000,
                    "free_text": "We are fully recovered.",
                },
            },
            {
                "id": "a4",
                "tool": "send_status_update",
                "args": {
                    "channel": "status_page",
                    "template_id": "free_text_v0",
                    "audience_segment": "enterprise_active",
                    "max_recipients": 120000,
                },
            },
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    state: dict[str, Any],
    policy_summary: dict[str, Any],
) -> str:
    req = request["request"]
    snap = state.get("snapshot") or {}
    delivery = state.get("delivery") or {}

    blocked = ", ".join(policy_summary.get("denied_tools") or []) or "none"
    sent = ""
    if delivery:
        sent = (
            f" Status update queued via {delivery.get('channel')} for {delivery.get('audience_segment')} "
            f"using template {delivery.get('template_id')} to {delivery.get('queued_recipients')} recipients."
        )

    failed_rate = snap.get("failed_payment_rate")
    failed_rate_str = f"{float(failed_rate) * 100:.1f}%" if isinstance(failed_rate, (int, float)) else "?"
    share = snap.get("affected_checkout_share")
    share_str = f"{float(share) * 100:.0f}%" if isinstance(share, (int, float)) else "?"

    return (
        f"Operations brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} remains "
        f"{snap.get('severity', '?')} with failed payments at {failed_rate_str} and "
        f"{snap.get('chargeback_alerts', '?')} chargeback alerts. Affected checkout share is "
        f"{share_str} and ETA is ~{snap.get('eta_minutes', '?')} minutes "
        f"(estimate, subject to change).{sent} Blocked by policy: {blocked}."
    )

gateway.py — policy boundary + execution boundary

PYTHON
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_actions: int = 8
    action_timeout_seconds: float = 1.2
    max_recipients_per_send: int = 50000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str
    enforced_action: dict[str, Any] | None = None


def _normalize_action(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    tool = raw.get("tool")
    args = raw.get("args")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(tool, str) or not tool.strip():
        raise StopRun("invalid_action:tool")
    if not isinstance(args, dict):
        raise StopRun("invalid_action:args")

    return {
        "id": action_id.strip(),
        "tool": tool.strip(),
        "args": dict(args),
    }


def validate_plan(raw_actions: Any, *, max_actions: int) -> list[dict[str, Any]]:
    if not isinstance(raw_actions, list) or not raw_actions:
        raise StopRun("invalid_plan:actions")
    if len(raw_actions) > max_actions:
        raise StopRun("invalid_plan:too_many_actions")
    return [_normalize_action(item) for item in raw_actions]


def validate_tool_observation(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    if raw.get("status") != "ok":
        raise StopRun(f"tool_status_not_ok:{tool_name}")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return data


class PolicyGateway:
    def __init__(
        self,
        *,
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self.allowed_templates = {"incident_p1_v2", "incident_p2_v1"}
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def evaluate(self, *, action: dict[str, Any], state: dict[str, Any]) -> Decision:
        del state
        normalized = _normalize_action(action)
        tool = normalized["tool"]
        args = dict(normalized["args"])

        if tool not in self.allowed_tools_policy:
            return Decision(kind="deny", reason="tool_denied_policy")
        if tool == "export_customer_data":
            return Decision(kind="deny", reason="pii_export_blocked")
        if tool not in self.allowed_tools_execution:
            return Decision(kind="deny", reason="tool_denied_execution")

        if tool != "send_status_update":
            return Decision(kind="allow", reason="policy_pass")

        rewrite_reasons: list[str] = []
        rewritten = dict(args)

        if rewritten.get("template_id") not in self.allowed_templates:
            rewritten["template_id"] = "incident_p1_v2"
            rewrite_reasons.append("template_allowlist")

        raw_recipients = rewritten.get("max_recipients", self.budget.max_recipients_per_send)
        try:
            recipients = int(raw_recipients)
        except (TypeError, ValueError):
            recipients = self.budget.max_recipients_per_send
        if recipients > self.budget.max_recipients_per_send:
            recipients = self.budget.max_recipients_per_send
            rewrite_reasons.append("recipient_cap")
        rewritten["max_recipients"] = recipients

        if "free_text" in rewritten:
            rewritten.pop("free_text", None)
            rewrite_reasons.append("free_text_removed")

        if (
            rewritten.get("channel") == "external_email"
            and rewritten.get("audience_segment") == "all_customers"
        ):
            rewritten["channel"] = "status_page"
            rewritten["audience_segment"] = "enterprise_active"
            enforced = {
                "id": normalized["id"],
                "tool": normalized["tool"],
                "args": rewritten,
            }
            return Decision(
                kind="escalate",
                reason="mass_external_broadcast",
                enforced_action=enforced,
            )

        if not rewrite_reasons:
            return Decision(kind="allow", reason="policy_pass")

        enforced = {
            "id": normalized["id"],
            "tool": normalized["tool"],
            "args": rewritten,
        }
        return Decision(
            kind="rewrite",
            reason=f"policy_rewrite:{','.join(rewrite_reasons)}",
            enforced_action=enforced,
        )

    def dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.action_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise StopRun(f"tool_timeout:{tool_name}") from exc
        except Exception as exc:  # noqa: BLE001
            raise StopRun(f"tool_error:{tool_name}:{type(exc).__name__}") from exc

        return validate_tool_observation(raw, tool_name=tool_name)

main.py — orchestrate policy-gated execution

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_action_plan
from context import build_request
from gateway import Budget, PolicyGateway, StopRun, validate_plan
from tools import (
    create_manual_review_ticket,
    export_customer_data,
    fetch_incident_snapshot,
    send_status_update,
)

GOAL = (
    "Prepare a customer-safe operations update for a US payments incident. "
    "Use policy-gated execution and never expose customer PII."
)
REQUEST = build_request(
    report_date="2026-03-06",
    region="US",
    incident_id="inc_payments_20260306",
)

BUDGET = Budget(
    max_seconds=25,
    max_actions=8,
    action_timeout_seconds=1.2,
    max_recipients_per_send=50000,
)

ALLOWED_TOOLS_POLICY = {
    "fetch_incident_snapshot",
    "send_status_update",
    "export_customer_data",
    "create_manual_review_ticket",
}
ALLOWED_TOOLS_EXECUTION = {
    "fetch_incident_snapshot",
    "send_status_update",
    "create_manual_review_ticket",
}

TOOLS: dict[str, Any] = {
    "fetch_incident_snapshot": fetch_incident_snapshot,
    "send_status_update": send_status_update,
    "export_customer_data": export_customer_data,
    "create_manual_review_ticket": create_manual_review_ticket,
}


def simulate_human_approval(*, action: dict[str, Any], reason: str) -> dict[str, Any]:
    # Demo policy: approve only the broadcast-risk escalation after safe rewrite.
    if reason == "mass_external_broadcast":
        return {"approved": True, "action": action, "reason": "approved_with_safe_scope"}
    return {"approved": False, "action": action, "reason": "human_rejected"}


def run_guarded_policy_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []
    state: dict[str, Any] = {"snapshot": None, "delivery": None}

    gateway = PolicyGateway(
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    decision_counts = {"allow": 0, "rewrite": 0, "deny": 0, "escalate": 0}
    denied_tools: list[str] = []
    rewritten_tools: list[str] = []
    escalated_tools: list[str] = []

    phase = "plan"

    try:
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="plan")

        raw_plan = propose_action_plan(goal=goal, request=request)
        actions = validate_plan(raw_plan.get("actions"), max_actions=BUDGET.max_actions)
        phase = "execute"

        for idx, action in enumerate(actions, start=1):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="execute")

            decision = gateway.evaluate(action=action, state=state)
            decision_counts[decision.kind] += 1

            trace_item = {
                "step": idx,
                "action_id": action["id"],
                "tool": action["tool"],
                "policy_decision": decision.kind,
                "policy_reason": decision.reason,
                "executed_from": "none",
                "ok": False,
            }
            history_item: dict[str, Any] = {
                "step": idx,
                "proposed_action": action,
                "policy": {"decision": decision.kind, "reason": decision.reason},
            }

            if decision.kind == "deny":
                denied_tools.append(action["tool"])
                trace.append(trace_item)
                history.append(history_item)
                continue

            executed_action = action
            executed_from = "original"
            if decision.kind == "rewrite":
                executed_action = decision.enforced_action or action
                rewritten_tools.append(action["tool"])
                executed_from = "policy_rewrite"
            elif decision.kind == "escalate":
                escalated_tools.append(action["tool"])
                escalation_ticket = gateway.dispatch(
                    tool_name="create_manual_review_ticket",
                    tool_fn=TOOLS["create_manual_review_ticket"],
                    args={"reason": decision.reason, "payload": (decision.enforced_action or action)},
                )
                history_item["escalation_ticket"] = escalation_ticket
                human = simulate_human_approval(
                    action=(decision.enforced_action or action),
                    reason=decision.reason,
                )
                if not human["approved"]:
                    return stopped("policy_escalation_rejected", phase="execute")
                executed_action = human["action"]
                executed_from = "human_approved"
                history_item["human"] = {
                    "approved": True,
                    "reason": human["reason"],
                }

            tool_name = executed_action["tool"]
            tool_fn = TOOLS.get(tool_name)
            if tool_fn is None:
                return stopped(f"tool_unmapped:{tool_name}", phase="execute")

            # Keep the decision in trace even if execution fails later.
            trace.append(trace_item)
            observation = gateway.dispatch(
                tool_name=tool_name,
                tool_fn=tool_fn,
                args=executed_action["args"],
            )
            if tool_name == "fetch_incident_snapshot":
                state["snapshot"] = observation
            elif tool_name == "send_status_update":
                state["delivery"] = observation

            trace_item["executed_from"] = executed_from
            trace_item["ok"] = True

            history_item["executed_action"] = executed_action
            history_item["executed_from"] = executed_from
            history_item["observation"] = observation
            history.append(history_item)

        if not isinstance(state["snapshot"], dict):
            return stopped("missing_required_observation:snapshot", phase="finalize")
        if not isinstance(state["delivery"], dict):
            return stopped("missing_required_observation:send_status_update", phase="finalize")

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident": state["snapshot"],
            "delivery": state["delivery"],
        }
        policy_summary = {
            "decisions": decision_counts,
            "denied_tools": sorted(set(denied_tools)),
            "rewritten_tools": sorted(set(rewritten_tools)),
            "escalated_tools": sorted(set(escalated_tools)),
        }
        answer = compose_final_answer(
            request=request,
            state=state,
            policy_summary=policy_summary,
        )

        trace.append(
            {
                "step": len(actions) + 1,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append({"step": len(actions) + 1, "action": "finalize"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "policy_guarded_success",
            "answer": answer,
            "proposed_plan": actions,
            "executed_plan": [
                step["executed_action"]
                for step in history
                if isinstance(step, dict) and isinstance(step.get("executed_action"), dict)
            ],
            "aggregate": aggregate,
            "policy_summary": policy_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)
    finally:
        gateway.close()


def main() -> None:
    result = run_guarded_policy_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en mots simples)

  • l agent propose des actions, mais l execution n a pas d acces direct aux tools
  • pii_export_blocked montre un deny policy-level pour un export risque
  • tool_denied_execution reste disponible comme runtime deny-path, mais n est pas declenche dans ce run
  • escalate : la policy construit une version sure de l action, human approval confirme le scope, puis execution en human_approved
  • trace/history rendent les decisions policy auditables
  • gateway.close() est appelee dans finally pour ne pas laisser le pool ouvert

Exemple de Sortie

JSON
{
  "run_id": "62537585-a465-4b35-bbd8-40153df722eb",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "policy_guarded_success",
  "answer": "Operations brief (US, 2026-03-06): incident inc_payments_20260306 remains P1 with failed payments at 3.4% and 5 chargeback alerts. Affected checkout share is 27% and ETA is ~45 minutes (estimate, subject to change). Status update queued via status_page for enterprise_active using template incident_p1_v2 to 50000 recipients. Blocked by policy: export_customer_data.",
  "proposed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a2",
      "tool": "export_customer_data",
      "args": {
        "fields": ["email", "country", "payment_last4"],
        "destination": "external_s3"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "external_email",
        "template_id": "free_text_v0",
        "audience_segment": "all_customers",
        "max_recipients": 120000,
        "free_text": "We are fully recovered."
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "free_text_v0",
        "audience_segment": "enterprise_active",
        "max_recipients": 120000
      }
    }
  ],
  "executed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    }
  ],
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "incident": {
      "incident_id": "inc_payments_20260306",
      "report_date": "2026-03-06",
      "region": "US",
      "severity": "P1",
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "affected_checkout_share": 0.27,
      "eta_minutes": 45
    },
    "delivery": {
      "channel": "status_page",
      "template_id": "incident_p1_v2",
      "audience_segment": "enterprise_active",
      "queued_recipients": 50000,
      "delivery_id": "upd_20260306_001"
    }
  },
  "policy_summary": {
    "decisions": {
      "allow": 1,
      "rewrite": 1,
      "deny": 1,
      "escalate": 1
    },
    "denied_tools": [
      "export_customer_data"
    ],
    "rewritten_tools": [
      "send_status_update"
    ],
    "escalated_tools": [
      "send_status_update"
    ]
  },
  "trace": [
    {
      "step": 1,
      "action_id": "a1",
      "tool": "fetch_incident_snapshot",
      "policy_decision": "allow",
      "policy_reason": "policy_pass",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "action_id": "a2",
      "tool": "export_customer_data",
      "policy_decision": "deny",
      "policy_reason": "pii_export_blocked",
      "executed_from": "none",
      "ok": false
    },
    {
      "step": 3,
      "action_id": "a3",
      "tool": "send_status_update",
      "policy_decision": "escalate",
      "policy_reason": "mass_external_broadcast",
      "executed_from": "human_approved",
      "ok": true
    },
    {
      "step": 4,
      "action_id": "a4",
      "tool": "send_status_update",
      "policy_decision": "rewrite",
      "policy_reason": "policy_rewrite:template_allowlist,recipient_cap",
      "executed_from": "policy_rewrite",
      "ok": true
    },
    {
      "step": 5,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

trace ne contient pas de diff de parametres ; voir les args sure dans history[*].executed_action.args.


stop_reason Typiques

  • success - run termine correctement
  • max_seconds - budget temps total epuise
  • invalid_plan:* - plan d actions invalide
  • invalid_action:* - contrat d action invalide
  • tool_timeout:* - le tool n a pas repondu a temps
  • tool_error:* - exception pendant tool execution
  • tool_status_not_ok:* - le tool a renvoye status != "ok"
  • tool_invalid_output:* - le tool a renvoye un contrat invalide
  • tool_unmapped:* - tool introuvable dans la dispatch map
  • policy_escalation_rejected - human approval a rejete l escalation
  • missing_required_observation:* - donnees critiques manquantes pour finaliser

Ce Qui N est PAS Montre Ici

  • persisted policy/event logs (DB/SIEM)
  • role/tenant-scoped policy packs
  • dynamic risk scoring (beyond static rules)
  • async approval queue (Jira/ServiceNow/Slack workflow)

Ce Qu il Faut Essayer Ensuite

  1. Retire send_status_update de ALLOWED_TOOLS_EXECUTION et observe : policy_decision=deny (tool_denied_execution) -> stop_reason=missing_required_observation:send_status_update final.
  2. Ajoute une deuxieme policy d escalation (par exemple, pour eta_minutes > 120) et verifie trace.
  3. Separe les policy reasons en codes structures (policy.mass_broadcast, policy.pii_export) pour une analytique plus simple.
⏱️ 13 min de lectureMis à jour 6 mars 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.

Auteur

Nick — ingénieur qui construit une infrastructure pour des agents IA en production.

Focus : patterns d’agents, modes de défaillance, contrôle du runtime et fiabilité des systèmes.

🔗 GitHub: https://github.com/mykolademyanov


Note éditoriale

Cette documentation est assistée par l’IA, avec une responsabilité éditoriale humaine pour l’exactitude, la clarté et la pertinence en production.

Le contenu s’appuie sur des défaillances réelles, des post-mortems et des incidents opérationnels dans des systèmes d’agents IA déployés.