Guarded-Policy Agent in Python: Vollständiges Beispiel

Production-style runnable Beispiel eines Guarded-Policy Agents in Python mit policy-gate allow/deny/rewrite/escalate und transparentem trace/history.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was Dieses Beispiel Zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Code
  8. context.py — request envelope
  9. tools.py - ausfuhrbare Tools
  10. agent.py — proposed actions + final answer
  11. gateway.py — policy boundary + execution boundary
  12. main.py — orchestrate policy-gated execution
  13. Beispielausgabe
  14. Typische stop_reason
  15. Was Hier NICHT Gezeigt Wird
  16. Was Du Als Nachstes Probieren Kannst

Kern des Musters (Kurz)

Guarded-Policy Agent bedeutet: Der Agent kann jede Aktion vorschlagen, aber die Ausfuhrung geht nur durch die Policy-Grenze.

Vor jedem Tool-Call gibt das Policy-Gate eine der Entscheidungen zuruck:

  • allow
  • deny
  • rewrite
  • escalate

Was Dieses Beispiel Zeigt

  • die LLM/Agent-Layer schlagt einen Aktionsplan vor (actions)
  • die Execution-Layer validiert den Action-Vertrag
  • die Policy-Grenze wendet allow/deny/rewrite/escalate an
  • runtime execution allowlist existiert als separater Guard-Pfad (in diesem Run nicht ausgelost; wird in einem separaten Experiment gepruft)
  • escalate: die Policy erstellt eine sichere Aktion, human approval bestatigt die Ausfuhrung
  • die finale Antwort wird nur aus sicher gewonnenen Observations erstellt

Architektur

  1. agent.py erstellt einen Plan mit vier Aktionen.
  2. gateway.py validiert jede Aktion und trifft die Policy-Entscheidung.
  3. a2 wird als deny (pii_export_blocked) blockiert und nicht ausgefuhrt.
  4. a3 geht zu escalate: die Policy erstellt eine sichere Variante, human approval bestatigt den Scope, danach wird als human_approved ausgefuhrt.
  5. a4 demonstriert rewrite (template/recipient cap) und wird als policy_rewrite ausgefuhrt.
  6. main.py sammelt trace/history und liefert den finalen Brief zuruck.

Projektstruktur

TEXT
agent-patterns/
└── guarded-policy-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/guarded-policy-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv.


Aufgabe

Production-Fall:

"Erstelle ein customer-safe Update zu einem P1-Payment-Incident fur US. Blockiere PII export und begrenze Massenkommunikation auf Policy-Ebene."


Code

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
        },
        "policy_hints": {
            "status_update_template": "incident_p1_v2",
            "max_recipients_per_send": 50000,
        },
    }

tools.py - ausfuhrbare Tools

PYTHON
from __future__ import annotations

import time
from typing import Any


def fetch_incident_snapshot(report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    time.sleep(0.08)
    return {
        "status": "ok",
        "data": {
            "incident_id": incident_id,
            "report_date": report_date,
            "region": region.upper(),
            "severity": "P1",
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "affected_checkout_share": 0.27,
            "eta_minutes": 45,
        },
    }


def send_status_update(
    channel: str,
    template_id: str,
    audience_segment: str,
    max_recipients: int,
) -> dict[str, Any]:
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "channel": channel,
            "template_id": template_id,
            "audience_segment": audience_segment,
            "queued_recipients": int(max_recipients),
            "delivery_id": "upd_20260306_001",
        },
    }


def export_customer_data(fields: list[str], destination: str) -> dict[str, Any]:
    del fields, destination
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "export_id": "exp_20260306_001",
            "rows": 18240,
        },
    }


def create_manual_review_ticket(reason: str, payload: dict[str, Any]) -> dict[str, Any]:
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "ticket_id": "pol_20260306_001",
            "reason": reason,
            "payload_keys": sorted(payload.keys()),
        },
    }

agent.py — proposed actions + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_action_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal
    return {
        "actions": [
            {
                "id": "a1",
                "tool": "fetch_incident_snapshot",
                "args": {
                    "report_date": req["report_date"],
                    "region": req["region"],
                    "incident_id": req["incident_id"],
                },
            },
            {
                "id": "a2",
                "tool": "export_customer_data",
                "args": {
                    "fields": ["email", "country", "payment_last4"],
                    "destination": "external_s3",
                },
            },
            {
                "id": "a3",
                "tool": "send_status_update",
                "args": {
                    "channel": "external_email",
                    "template_id": "free_text_v0",
                    "audience_segment": "all_customers",
                    "max_recipients": 120000,
                    "free_text": "We are fully recovered.",
                },
            },
            {
                "id": "a4",
                "tool": "send_status_update",
                "args": {
                    "channel": "status_page",
                    "template_id": "free_text_v0",
                    "audience_segment": "enterprise_active",
                    "max_recipients": 120000,
                },
            },
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    state: dict[str, Any],
    policy_summary: dict[str, Any],
) -> str:
    req = request["request"]
    snap = state.get("snapshot") or {}
    delivery = state.get("delivery") or {}

    blocked = ", ".join(policy_summary.get("denied_tools") or []) or "none"
    sent = ""
    if delivery:
        sent = (
            f" Status update queued via {delivery.get('channel')} for {delivery.get('audience_segment')} "
            f"using template {delivery.get('template_id')} to {delivery.get('queued_recipients')} recipients."
        )

    failed_rate = snap.get("failed_payment_rate")
    failed_rate_str = f"{float(failed_rate) * 100:.1f}%" if isinstance(failed_rate, (int, float)) else "?"
    share = snap.get("affected_checkout_share")
    share_str = f"{float(share) * 100:.0f}%" if isinstance(share, (int, float)) else "?"

    return (
        f"Operations brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} remains "
        f"{snap.get('severity', '?')} with failed payments at {failed_rate_str} and "
        f"{snap.get('chargeback_alerts', '?')} chargeback alerts. Affected checkout share is "
        f"{share_str} and ETA is ~{snap.get('eta_minutes', '?')} minutes "
        f"(estimate, subject to change).{sent} Blocked by policy: {blocked}."
    )

gateway.py — policy boundary + execution boundary

PYTHON
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_actions: int = 8
    action_timeout_seconds: float = 1.2
    max_recipients_per_send: int = 50000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str
    enforced_action: dict[str, Any] | None = None


def _normalize_action(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    tool = raw.get("tool")
    args = raw.get("args")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(tool, str) or not tool.strip():
        raise StopRun("invalid_action:tool")
    if not isinstance(args, dict):
        raise StopRun("invalid_action:args")

    return {
        "id": action_id.strip(),
        "tool": tool.strip(),
        "args": dict(args),
    }


def validate_plan(raw_actions: Any, *, max_actions: int) -> list[dict[str, Any]]:
    if not isinstance(raw_actions, list) or not raw_actions:
        raise StopRun("invalid_plan:actions")
    if len(raw_actions) > max_actions:
        raise StopRun("invalid_plan:too_many_actions")
    return [_normalize_action(item) for item in raw_actions]


def validate_tool_observation(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    if raw.get("status") != "ok":
        raise StopRun(f"tool_status_not_ok:{tool_name}")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return data


class PolicyGateway:
    def __init__(
        self,
        *,
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self.allowed_templates = {"incident_p1_v2", "incident_p2_v1"}
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def evaluate(self, *, action: dict[str, Any], state: dict[str, Any]) -> Decision:
        del state
        normalized = _normalize_action(action)
        tool = normalized["tool"]
        args = dict(normalized["args"])

        if tool not in self.allowed_tools_policy:
            return Decision(kind="deny", reason="tool_denied_policy")
        if tool == "export_customer_data":
            return Decision(kind="deny", reason="pii_export_blocked")
        if tool not in self.allowed_tools_execution:
            return Decision(kind="deny", reason="tool_denied_execution")

        if tool != "send_status_update":
            return Decision(kind="allow", reason="policy_pass")

        rewrite_reasons: list[str] = []
        rewritten = dict(args)

        if rewritten.get("template_id") not in self.allowed_templates:
            rewritten["template_id"] = "incident_p1_v2"
            rewrite_reasons.append("template_allowlist")

        raw_recipients = rewritten.get("max_recipients", self.budget.max_recipients_per_send)
        try:
            recipients = int(raw_recipients)
        except (TypeError, ValueError):
            recipients = self.budget.max_recipients_per_send
        if recipients > self.budget.max_recipients_per_send:
            recipients = self.budget.max_recipients_per_send
            rewrite_reasons.append("recipient_cap")
        rewritten["max_recipients"] = recipients

        if "free_text" in rewritten:
            rewritten.pop("free_text", None)
            rewrite_reasons.append("free_text_removed")

        if (
            rewritten.get("channel") == "external_email"
            and rewritten.get("audience_segment") == "all_customers"
        ):
            rewritten["channel"] = "status_page"
            rewritten["audience_segment"] = "enterprise_active"
            enforced = {
                "id": normalized["id"],
                "tool": normalized["tool"],
                "args": rewritten,
            }
            return Decision(
                kind="escalate",
                reason="mass_external_broadcast",
                enforced_action=enforced,
            )

        if not rewrite_reasons:
            return Decision(kind="allow", reason="policy_pass")

        enforced = {
            "id": normalized["id"],
            "tool": normalized["tool"],
            "args": rewritten,
        }
        return Decision(
            kind="rewrite",
            reason=f"policy_rewrite:{','.join(rewrite_reasons)}",
            enforced_action=enforced,
        )

    def dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.action_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise StopRun(f"tool_timeout:{tool_name}") from exc
        except Exception as exc:  # noqa: BLE001
            raise StopRun(f"tool_error:{tool_name}:{type(exc).__name__}") from exc

        return validate_tool_observation(raw, tool_name=tool_name)

main.py — orchestrate policy-gated execution

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_action_plan
from context import build_request
from gateway import Budget, PolicyGateway, StopRun, validate_plan
from tools import (
    create_manual_review_ticket,
    export_customer_data,
    fetch_incident_snapshot,
    send_status_update,
)

GOAL = (
    "Prepare a customer-safe operations update for a US payments incident. "
    "Use policy-gated execution and never expose customer PII."
)
REQUEST = build_request(
    report_date="2026-03-06",
    region="US",
    incident_id="inc_payments_20260306",
)

BUDGET = Budget(
    max_seconds=25,
    max_actions=8,
    action_timeout_seconds=1.2,
    max_recipients_per_send=50000,
)

ALLOWED_TOOLS_POLICY = {
    "fetch_incident_snapshot",
    "send_status_update",
    "export_customer_data",
    "create_manual_review_ticket",
}
ALLOWED_TOOLS_EXECUTION = {
    "fetch_incident_snapshot",
    "send_status_update",
    "create_manual_review_ticket",
}

TOOLS: dict[str, Any] = {
    "fetch_incident_snapshot": fetch_incident_snapshot,
    "send_status_update": send_status_update,
    "export_customer_data": export_customer_data,
    "create_manual_review_ticket": create_manual_review_ticket,
}


def simulate_human_approval(*, action: dict[str, Any], reason: str) -> dict[str, Any]:
    # Demo policy: approve only the broadcast-risk escalation after safe rewrite.
    if reason == "mass_external_broadcast":
        return {"approved": True, "action": action, "reason": "approved_with_safe_scope"}
    return {"approved": False, "action": action, "reason": "human_rejected"}


def run_guarded_policy_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []
    state: dict[str, Any] = {"snapshot": None, "delivery": None}

    gateway = PolicyGateway(
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    decision_counts = {"allow": 0, "rewrite": 0, "deny": 0, "escalate": 0}
    denied_tools: list[str] = []
    rewritten_tools: list[str] = []
    escalated_tools: list[str] = []

    phase = "plan"

    try:
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="plan")

        raw_plan = propose_action_plan(goal=goal, request=request)
        actions = validate_plan(raw_plan.get("actions"), max_actions=BUDGET.max_actions)
        phase = "execute"

        for idx, action in enumerate(actions, start=1):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="execute")

            decision = gateway.evaluate(action=action, state=state)
            decision_counts[decision.kind] += 1

            trace_item = {
                "step": idx,
                "action_id": action["id"],
                "tool": action["tool"],
                "policy_decision": decision.kind,
                "policy_reason": decision.reason,
                "executed_from": "none",
                "ok": False,
            }
            history_item: dict[str, Any] = {
                "step": idx,
                "proposed_action": action,
                "policy": {"decision": decision.kind, "reason": decision.reason},
            }

            if decision.kind == "deny":
                denied_tools.append(action["tool"])
                trace.append(trace_item)
                history.append(history_item)
                continue

            executed_action = action
            executed_from = "original"
            if decision.kind == "rewrite":
                executed_action = decision.enforced_action or action
                rewritten_tools.append(action["tool"])
                executed_from = "policy_rewrite"
            elif decision.kind == "escalate":
                escalated_tools.append(action["tool"])
                escalation_ticket = gateway.dispatch(
                    tool_name="create_manual_review_ticket",
                    tool_fn=TOOLS["create_manual_review_ticket"],
                    args={"reason": decision.reason, "payload": (decision.enforced_action or action)},
                )
                history_item["escalation_ticket"] = escalation_ticket
                human = simulate_human_approval(
                    action=(decision.enforced_action or action),
                    reason=decision.reason,
                )
                if not human["approved"]:
                    return stopped("policy_escalation_rejected", phase="execute")
                executed_action = human["action"]
                executed_from = "human_approved"
                history_item["human"] = {
                    "approved": True,
                    "reason": human["reason"],
                }

            tool_name = executed_action["tool"]
            tool_fn = TOOLS.get(tool_name)
            if tool_fn is None:
                return stopped(f"tool_unmapped:{tool_name}", phase="execute")

            # Keep the decision in trace even if execution fails later.
            trace.append(trace_item)
            observation = gateway.dispatch(
                tool_name=tool_name,
                tool_fn=tool_fn,
                args=executed_action["args"],
            )
            if tool_name == "fetch_incident_snapshot":
                state["snapshot"] = observation
            elif tool_name == "send_status_update":
                state["delivery"] = observation

            trace_item["executed_from"] = executed_from
            trace_item["ok"] = True

            history_item["executed_action"] = executed_action
            history_item["executed_from"] = executed_from
            history_item["observation"] = observation
            history.append(history_item)

        if not isinstance(state["snapshot"], dict):
            return stopped("missing_required_observation:snapshot", phase="finalize")
        if not isinstance(state["delivery"], dict):
            return stopped("missing_required_observation:send_status_update", phase="finalize")

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident": state["snapshot"],
            "delivery": state["delivery"],
        }
        policy_summary = {
            "decisions": decision_counts,
            "denied_tools": sorted(set(denied_tools)),
            "rewritten_tools": sorted(set(rewritten_tools)),
            "escalated_tools": sorted(set(escalated_tools)),
        }
        answer = compose_final_answer(
            request=request,
            state=state,
            policy_summary=policy_summary,
        )

        trace.append(
            {
                "step": len(actions) + 1,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append({"step": len(actions) + 1, "action": "finalize"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "policy_guarded_success",
            "answer": answer,
            "proposed_plan": actions,
            "executed_plan": [
                step["executed_action"]
                for step in history
                if isinstance(step, dict) and isinstance(step.get("executed_action"), dict)
            ],
            "aggregate": aggregate,
            "policy_summary": policy_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)
    finally:
        gateway.close()


def main() -> None:
    result = run_guarded_policy_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklart)

  • der Agent schlagt Aktionen vor, aber die Ausfuhrung hat keinen direkten Zugriff auf Tools
  • pii_export_blocked zeigt ein Policy-Level-Deny fur riskanten Export
  • tool_denied_execution bleibt als Runtime-Deny-Pfad verfugbar, wird in diesem Run aber nicht ausgelost
  • escalate: die Policy erstellt eine sichere Aktionsversion, human approval bestatigt den Scope, danach Ausfuhrung als human_approved
  • trace/history machen Policy-Entscheidungen auditierbar
  • gateway.close() wird in finally aufgerufen, damit der Pool nicht offen bleibt

Beispielausgabe

JSON
{
  "run_id": "62537585-a465-4b35-bbd8-40153df722eb",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "policy_guarded_success",
  "answer": "Operations brief (US, 2026-03-06): incident inc_payments_20260306 remains P1 with failed payments at 3.4% and 5 chargeback alerts. Affected checkout share is 27% and ETA is ~45 minutes (estimate, subject to change). Status update queued via status_page for enterprise_active using template incident_p1_v2 to 50000 recipients. Blocked by policy: export_customer_data.",
  "proposed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a2",
      "tool": "export_customer_data",
      "args": {
        "fields": ["email", "country", "payment_last4"],
        "destination": "external_s3"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "external_email",
        "template_id": "free_text_v0",
        "audience_segment": "all_customers",
        "max_recipients": 120000,
        "free_text": "We are fully recovered."
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "free_text_v0",
        "audience_segment": "enterprise_active",
        "max_recipients": 120000
      }
    }
  ],
  "executed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    }
  ],
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "incident": {
      "incident_id": "inc_payments_20260306",
      "report_date": "2026-03-06",
      "region": "US",
      "severity": "P1",
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "affected_checkout_share": 0.27,
      "eta_minutes": 45
    },
    "delivery": {
      "channel": "status_page",
      "template_id": "incident_p1_v2",
      "audience_segment": "enterprise_active",
      "queued_recipients": 50000,
      "delivery_id": "upd_20260306_001"
    }
  },
  "policy_summary": {
    "decisions": {
      "allow": 1,
      "rewrite": 1,
      "deny": 1,
      "escalate": 1
    },
    "denied_tools": [
      "export_customer_data"
    ],
    "rewritten_tools": [
      "send_status_update"
    ],
    "escalated_tools": [
      "send_status_update"
    ]
  },
  "trace": [
    {
      "step": 1,
      "action_id": "a1",
      "tool": "fetch_incident_snapshot",
      "policy_decision": "allow",
      "policy_reason": "policy_pass",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "action_id": "a2",
      "tool": "export_customer_data",
      "policy_decision": "deny",
      "policy_reason": "pii_export_blocked",
      "executed_from": "none",
      "ok": false
    },
    {
      "step": 3,
      "action_id": "a3",
      "tool": "send_status_update",
      "policy_decision": "escalate",
      "policy_reason": "mass_external_broadcast",
      "executed_from": "human_approved",
      "ok": true
    },
    {
      "step": 4,
      "action_id": "a4",
      "tool": "send_status_update",
      "policy_decision": "rewrite",
      "policy_reason": "policy_rewrite:template_allowlist,recipient_cap",
      "executed_from": "policy_rewrite",
      "ok": true
    },
    {
      "step": 5,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

trace enthalt keine Parameter-Diffs; sichere Args stehen in history[*].executed_action.args.


Typische stop_reason

  • success - Run wurde erfolgreich abgeschlossen
  • max_seconds - gesamtes Time-Budget aufgebraucht
  • invalid_plan:* - ungueltiger Aktionsplan
  • invalid_action:* - ungueltiger Action-Vertrag
  • tool_timeout:* - Tool hat nicht rechtzeitig geantwortet
  • tool_error:* - Exception wahrend Tool-Ausfuhrung
  • tool_status_not_ok:* - Tool gab status != "ok" zuruck
  • tool_invalid_output:* - Tool gab einen ungueltigen Vertrag zuruck
  • tool_unmapped:* - Tool nicht in der Dispatch-Map gefunden
  • policy_escalation_rejected - human approval hat die Eskalation abgelehnt
  • missing_required_observation:* - kritische Daten fur Finalisierung fehlen

Was Hier NICHT Gezeigt Wird

  • persisted policy/event logs (DB/SIEM)
  • role/tenant-scoped policy packs
  • dynamic risk scoring (beyond static rules)
  • async approval queue (Jira/ServiceNow/Slack workflow)

Was Du Als Nachstes Probieren Kannst

  1. send_status_update aus ALLOWED_TOOLS_EXECUTION entfernen und beobachten: policy_decision=deny (tool_denied_execution) -> final stop_reason=missing_required_observation:send_status_update.
  2. Eine zweite Eskalations-Policy hinzufugen (z. B. fur eta_minutes > 120) und trace prufen.
  3. Policy-Reasons in strukturierte Codes aufteilen (policy.mass_broadcast, policy.pii_export) fur einfachere Analytics.
⏱️ 12 Min. LesezeitAktualisiert 6. März 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.

Autor

Nick — Engineer, der Infrastruktur für KI-Agenten in Produktion aufbaut.

Fokus: Agent-Patterns, Failure-Modes, Runtime-Steuerung und Systemzuverlässigkeit.

🔗 GitHub: https://github.com/mykolademyanov


Redaktioneller Hinweis

Diese Dokumentation ist KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Der Inhalt basiert auf realen Ausfällen, Post-Mortems und operativen Vorfällen in produktiv eingesetzten KI-Agenten-Systemen.