Kern des Musters (Kurz)
Guarded-Policy Agent bedeutet: Der Agent kann jede Aktion vorschlagen, aber die Ausfuhrung geht nur durch die Policy-Grenze.
Vor jedem Tool-Call gibt das Policy-Gate eine der Entscheidungen zuruck:
allowdenyrewriteescalate
Was Dieses Beispiel Zeigt
- die LLM/Agent-Layer schlagt einen Aktionsplan vor (
actions) - die Execution-Layer validiert den Action-Vertrag
- die Policy-Grenze wendet
allow/deny/rewrite/escalatean - runtime execution allowlist existiert als separater Guard-Pfad (in diesem Run nicht ausgelost; wird in einem separaten Experiment gepruft)
escalate: die Policy erstellt eine sichere Aktion, human approval bestatigt die Ausfuhrung- die finale Antwort wird nur aus sicher gewonnenen Observations erstellt
Architektur
agent.pyerstellt einen Plan mit vier Aktionen.gateway.pyvalidiert jede Aktion und trifft die Policy-Entscheidung.a2wird alsdeny(pii_export_blocked) blockiert und nicht ausgefuhrt.a3geht zuescalate: die Policy erstellt eine sichere Variante, human approval bestatigt den Scope, danach wird alshuman_approvedausgefuhrt.a4demonstriertrewrite(template/recipient cap) und wird alspolicy_rewriteausgefuhrt.main.pysammelttrace/historyund liefert den finalen Brief zuruck.
Projektstruktur
agent-patterns/
└── guarded-policy-agent/
└── python/
├── main.py
├── gateway.py
├── tools.py
├── agent.py
├── context.py
├── README.md
└── requirements.txt
Ausführen
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/guarded-policy-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ ist erforderlich.
Variante über export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Variante über .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv.
Aufgabe
Production-Fall:
"Erstelle ein customer-safe Update zu einem P1-Payment-Incident fur US. Blockiere PII export und begrenze Massenkommunikation auf Policy-Ebene."
Code
context.py — request envelope
from __future__ import annotations
from typing import Any
def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
return {
"request": {
"report_date": report_date,
"region": region.upper(),
"incident_id": incident_id,
},
"policy_hints": {
"status_update_template": "incident_p1_v2",
"max_recipients_per_send": 50000,
},
}
tools.py - ausfuhrbare Tools
from __future__ import annotations
import time
from typing import Any
def fetch_incident_snapshot(report_date: str, region: str, incident_id: str) -> dict[str, Any]:
time.sleep(0.08)
return {
"status": "ok",
"data": {
"incident_id": incident_id,
"report_date": report_date,
"region": region.upper(),
"severity": "P1",
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"affected_checkout_share": 0.27,
"eta_minutes": 45,
},
}
def send_status_update(
channel: str,
template_id: str,
audience_segment: str,
max_recipients: int,
) -> dict[str, Any]:
time.sleep(0.05)
return {
"status": "ok",
"data": {
"channel": channel,
"template_id": template_id,
"audience_segment": audience_segment,
"queued_recipients": int(max_recipients),
"delivery_id": "upd_20260306_001",
},
}
def export_customer_data(fields: list[str], destination: str) -> dict[str, Any]:
del fields, destination
time.sleep(0.03)
return {
"status": "ok",
"data": {
"export_id": "exp_20260306_001",
"rows": 18240,
},
}
def create_manual_review_ticket(reason: str, payload: dict[str, Any]) -> dict[str, Any]:
time.sleep(0.03)
return {
"status": "ok",
"data": {
"ticket_id": "pol_20260306_001",
"reason": reason,
"payload_keys": sorted(payload.keys()),
},
}
agent.py — proposed actions + final answer
from __future__ import annotations
from typing import Any
def propose_action_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
req = request["request"]
del goal
return {
"actions": [
{
"id": "a1",
"tool": "fetch_incident_snapshot",
"args": {
"report_date": req["report_date"],
"region": req["region"],
"incident_id": req["incident_id"],
},
},
{
"id": "a2",
"tool": "export_customer_data",
"args": {
"fields": ["email", "country", "payment_last4"],
"destination": "external_s3",
},
},
{
"id": "a3",
"tool": "send_status_update",
"args": {
"channel": "external_email",
"template_id": "free_text_v0",
"audience_segment": "all_customers",
"max_recipients": 120000,
"free_text": "We are fully recovered.",
},
},
{
"id": "a4",
"tool": "send_status_update",
"args": {
"channel": "status_page",
"template_id": "free_text_v0",
"audience_segment": "enterprise_active",
"max_recipients": 120000,
},
},
]
}
def compose_final_answer(
*,
request: dict[str, Any],
state: dict[str, Any],
policy_summary: dict[str, Any],
) -> str:
req = request["request"]
snap = state.get("snapshot") or {}
delivery = state.get("delivery") or {}
blocked = ", ".join(policy_summary.get("denied_tools") or []) or "none"
sent = ""
if delivery:
sent = (
f" Status update queued via {delivery.get('channel')} for {delivery.get('audience_segment')} "
f"using template {delivery.get('template_id')} to {delivery.get('queued_recipients')} recipients."
)
failed_rate = snap.get("failed_payment_rate")
failed_rate_str = f"{float(failed_rate) * 100:.1f}%" if isinstance(failed_rate, (int, float)) else "?"
share = snap.get("affected_checkout_share")
share_str = f"{float(share) * 100:.0f}%" if isinstance(share, (int, float)) else "?"
return (
f"Operations brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} remains "
f"{snap.get('severity', '?')} with failed payments at {failed_rate_str} and "
f"{snap.get('chargeback_alerts', '?')} chargeback alerts. Affected checkout share is "
f"{share_str} and ETA is ~{snap.get('eta_minutes', '?')} minutes "
f"(estimate, subject to change).{sent} Blocked by policy: {blocked}."
)
gateway.py — policy boundary + execution boundary
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 25
max_actions: int = 8
action_timeout_seconds: float = 1.2
max_recipients_per_send: int = 50000
@dataclass(frozen=True)
class Decision:
kind: str
reason: str
enforced_action: dict[str, Any] | None = None
def _normalize_action(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_action:not_object")
action_id = raw.get("id")
tool = raw.get("tool")
args = raw.get("args")
if not isinstance(action_id, str) or not action_id.strip():
raise StopRun("invalid_action:id")
if not isinstance(tool, str) or not tool.strip():
raise StopRun("invalid_action:tool")
if not isinstance(args, dict):
raise StopRun("invalid_action:args")
return {
"id": action_id.strip(),
"tool": tool.strip(),
"args": dict(args),
}
def validate_plan(raw_actions: Any, *, max_actions: int) -> list[dict[str, Any]]:
if not isinstance(raw_actions, list) or not raw_actions:
raise StopRun("invalid_plan:actions")
if len(raw_actions) > max_actions:
raise StopRun("invalid_plan:too_many_actions")
return [_normalize_action(item) for item in raw_actions]
def validate_tool_observation(raw: Any, *, tool_name: str) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun(f"tool_invalid_output:{tool_name}")
if raw.get("status") != "ok":
raise StopRun(f"tool_status_not_ok:{tool_name}")
data = raw.get("data")
if not isinstance(data, dict):
raise StopRun(f"tool_invalid_output:{tool_name}")
return data
class PolicyGateway:
def __init__(
self,
*,
allowed_tools_policy: set[str],
allowed_tools_execution: set[str],
budget: Budget,
):
self.allowed_tools_policy = set(allowed_tools_policy)
self.allowed_tools_execution = set(allowed_tools_execution)
self.budget = budget
self.allowed_templates = {"incident_p1_v2", "incident_p2_v1"}
self._pool = ThreadPoolExecutor(max_workers=4)
def close(self) -> None:
self._pool.shutdown(wait=False, cancel_futures=True)
def evaluate(self, *, action: dict[str, Any], state: dict[str, Any]) -> Decision:
del state
normalized = _normalize_action(action)
tool = normalized["tool"]
args = dict(normalized["args"])
if tool not in self.allowed_tools_policy:
return Decision(kind="deny", reason="tool_denied_policy")
if tool == "export_customer_data":
return Decision(kind="deny", reason="pii_export_blocked")
if tool not in self.allowed_tools_execution:
return Decision(kind="deny", reason="tool_denied_execution")
if tool != "send_status_update":
return Decision(kind="allow", reason="policy_pass")
rewrite_reasons: list[str] = []
rewritten = dict(args)
if rewritten.get("template_id") not in self.allowed_templates:
rewritten["template_id"] = "incident_p1_v2"
rewrite_reasons.append("template_allowlist")
raw_recipients = rewritten.get("max_recipients", self.budget.max_recipients_per_send)
try:
recipients = int(raw_recipients)
except (TypeError, ValueError):
recipients = self.budget.max_recipients_per_send
if recipients > self.budget.max_recipients_per_send:
recipients = self.budget.max_recipients_per_send
rewrite_reasons.append("recipient_cap")
rewritten["max_recipients"] = recipients
if "free_text" in rewritten:
rewritten.pop("free_text", None)
rewrite_reasons.append("free_text_removed")
if (
rewritten.get("channel") == "external_email"
and rewritten.get("audience_segment") == "all_customers"
):
rewritten["channel"] = "status_page"
rewritten["audience_segment"] = "enterprise_active"
enforced = {
"id": normalized["id"],
"tool": normalized["tool"],
"args": rewritten,
}
return Decision(
kind="escalate",
reason="mass_external_broadcast",
enforced_action=enforced,
)
if not rewrite_reasons:
return Decision(kind="allow", reason="policy_pass")
enforced = {
"id": normalized["id"],
"tool": normalized["tool"],
"args": rewritten,
}
return Decision(
kind="rewrite",
reason=f"policy_rewrite:{','.join(rewrite_reasons)}",
enforced_action=enforced,
)
def dispatch(
self,
*,
tool_name: str,
tool_fn: Callable[..., dict[str, Any]],
args: dict[str, Any],
) -> dict[str, Any]:
future = self._pool.submit(tool_fn, **args)
try:
raw = future.result(timeout=self.budget.action_timeout_seconds)
except FuturesTimeoutError as exc:
raise StopRun(f"tool_timeout:{tool_name}") from exc
except Exception as exc: # noqa: BLE001
raise StopRun(f"tool_error:{tool_name}:{type(exc).__name__}") from exc
return validate_tool_observation(raw, tool_name=tool_name)
main.py — orchestrate policy-gated execution
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from agent import compose_final_answer, propose_action_plan
from context import build_request
from gateway import Budget, PolicyGateway, StopRun, validate_plan
from tools import (
create_manual_review_ticket,
export_customer_data,
fetch_incident_snapshot,
send_status_update,
)
GOAL = (
"Prepare a customer-safe operations update for a US payments incident. "
"Use policy-gated execution and never expose customer PII."
)
REQUEST = build_request(
report_date="2026-03-06",
region="US",
incident_id="inc_payments_20260306",
)
BUDGET = Budget(
max_seconds=25,
max_actions=8,
action_timeout_seconds=1.2,
max_recipients_per_send=50000,
)
ALLOWED_TOOLS_POLICY = {
"fetch_incident_snapshot",
"send_status_update",
"export_customer_data",
"create_manual_review_ticket",
}
ALLOWED_TOOLS_EXECUTION = {
"fetch_incident_snapshot",
"send_status_update",
"create_manual_review_ticket",
}
TOOLS: dict[str, Any] = {
"fetch_incident_snapshot": fetch_incident_snapshot,
"send_status_update": send_status_update,
"export_customer_data": export_customer_data,
"create_manual_review_ticket": create_manual_review_ticket,
}
def simulate_human_approval(*, action: dict[str, Any], reason: str) -> dict[str, Any]:
# Demo policy: approve only the broadcast-risk escalation after safe rewrite.
if reason == "mass_external_broadcast":
return {"approved": True, "action": action, "reason": "approved_with_safe_scope"}
return {"approved": False, "action": action, "reason": "human_rejected"}
def run_guarded_policy_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
state: dict[str, Any] = {"snapshot": None, "delivery": None}
gateway = PolicyGateway(
allowed_tools_policy=ALLOWED_TOOLS_POLICY,
allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
decision_counts = {"allow": 0, "rewrite": 0, "deny": 0, "escalate": 0}
denied_tools: list[str] = []
rewritten_tools: list[str] = []
escalated_tools: list[str] = []
phase = "plan"
try:
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="plan")
raw_plan = propose_action_plan(goal=goal, request=request)
actions = validate_plan(raw_plan.get("actions"), max_actions=BUDGET.max_actions)
phase = "execute"
for idx, action in enumerate(actions, start=1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="execute")
decision = gateway.evaluate(action=action, state=state)
decision_counts[decision.kind] += 1
trace_item = {
"step": idx,
"action_id": action["id"],
"tool": action["tool"],
"policy_decision": decision.kind,
"policy_reason": decision.reason,
"executed_from": "none",
"ok": False,
}
history_item: dict[str, Any] = {
"step": idx,
"proposed_action": action,
"policy": {"decision": decision.kind, "reason": decision.reason},
}
if decision.kind == "deny":
denied_tools.append(action["tool"])
trace.append(trace_item)
history.append(history_item)
continue
executed_action = action
executed_from = "original"
if decision.kind == "rewrite":
executed_action = decision.enforced_action or action
rewritten_tools.append(action["tool"])
executed_from = "policy_rewrite"
elif decision.kind == "escalate":
escalated_tools.append(action["tool"])
escalation_ticket = gateway.dispatch(
tool_name="create_manual_review_ticket",
tool_fn=TOOLS["create_manual_review_ticket"],
args={"reason": decision.reason, "payload": (decision.enforced_action or action)},
)
history_item["escalation_ticket"] = escalation_ticket
human = simulate_human_approval(
action=(decision.enforced_action or action),
reason=decision.reason,
)
if not human["approved"]:
return stopped("policy_escalation_rejected", phase="execute")
executed_action = human["action"]
executed_from = "human_approved"
history_item["human"] = {
"approved": True,
"reason": human["reason"],
}
tool_name = executed_action["tool"]
tool_fn = TOOLS.get(tool_name)
if tool_fn is None:
return stopped(f"tool_unmapped:{tool_name}", phase="execute")
# Keep the decision in trace even if execution fails later.
trace.append(trace_item)
observation = gateway.dispatch(
tool_name=tool_name,
tool_fn=tool_fn,
args=executed_action["args"],
)
if tool_name == "fetch_incident_snapshot":
state["snapshot"] = observation
elif tool_name == "send_status_update":
state["delivery"] = observation
trace_item["executed_from"] = executed_from
trace_item["ok"] = True
history_item["executed_action"] = executed_action
history_item["executed_from"] = executed_from
history_item["observation"] = observation
history.append(history_item)
if not isinstance(state["snapshot"], dict):
return stopped("missing_required_observation:snapshot", phase="finalize")
if not isinstance(state["delivery"], dict):
return stopped("missing_required_observation:send_status_update", phase="finalize")
aggregate = {
"report_date": request["request"]["report_date"],
"region": request["request"]["region"],
"incident": state["snapshot"],
"delivery": state["delivery"],
}
policy_summary = {
"decisions": decision_counts,
"denied_tools": sorted(set(denied_tools)),
"rewritten_tools": sorted(set(rewritten_tools)),
"escalated_tools": sorted(set(escalated_tools)),
}
answer = compose_final_answer(
request=request,
state=state,
policy_summary=policy_summary,
)
trace.append(
{
"step": len(actions) + 1,
"phase": "finalize",
"ok": True,
}
)
history.append({"step": len(actions) + 1, "action": "finalize"})
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "policy_guarded_success",
"answer": answer,
"proposed_plan": actions,
"executed_plan": [
step["executed_action"]
for step in history
if isinstance(step, dict) and isinstance(step.get("executed_action"), dict)
],
"aggregate": aggregate,
"policy_summary": policy_summary,
"trace": trace,
"history": history,
}
except StopRun as exc:
return stopped(exc.reason, phase=phase)
finally:
gateway.close()
def main() -> None:
result = run_guarded_policy_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Was hier am wichtigsten ist (einfach erklart)
- der Agent schlagt Aktionen vor, aber die Ausfuhrung hat keinen direkten Zugriff auf Tools
pii_export_blockedzeigt ein Policy-Level-Deny fur riskanten Exporttool_denied_executionbleibt als Runtime-Deny-Pfad verfugbar, wird in diesem Run aber nicht ausgelostescalate: die Policy erstellt eine sichere Aktionsversion, human approval bestatigt den Scope, danach Ausfuhrung alshuman_approvedtrace/historymachen Policy-Entscheidungen auditierbargateway.close()wird infinallyaufgerufen, damit der Pool nicht offen bleibt
Beispielausgabe
{
"run_id": "62537585-a465-4b35-bbd8-40153df722eb",
"status": "ok",
"stop_reason": "success",
"outcome": "policy_guarded_success",
"answer": "Operations brief (US, 2026-03-06): incident inc_payments_20260306 remains P1 with failed payments at 3.4% and 5 chargeback alerts. Affected checkout share is 27% and ETA is ~45 minutes (estimate, subject to change). Status update queued via status_page for enterprise_active using template incident_p1_v2 to 50000 recipients. Blocked by policy: export_customer_data.",
"proposed_plan": [
{
"id": "a1",
"tool": "fetch_incident_snapshot",
"args": {
"report_date": "2026-03-06",
"region": "US",
"incident_id": "inc_payments_20260306"
}
},
{
"id": "a2",
"tool": "export_customer_data",
"args": {
"fields": ["email", "country", "payment_last4"],
"destination": "external_s3"
}
},
{
"id": "a3",
"tool": "send_status_update",
"args": {
"channel": "external_email",
"template_id": "free_text_v0",
"audience_segment": "all_customers",
"max_recipients": 120000,
"free_text": "We are fully recovered."
}
},
{
"id": "a4",
"tool": "send_status_update",
"args": {
"channel": "status_page",
"template_id": "free_text_v0",
"audience_segment": "enterprise_active",
"max_recipients": 120000
}
}
],
"executed_plan": [
{
"id": "a1",
"tool": "fetch_incident_snapshot",
"args": {
"report_date": "2026-03-06",
"region": "US",
"incident_id": "inc_payments_20260306"
}
},
{
"id": "a3",
"tool": "send_status_update",
"args": {
"channel": "status_page",
"template_id": "incident_p1_v2",
"audience_segment": "enterprise_active",
"max_recipients": 50000
}
},
{
"id": "a4",
"tool": "send_status_update",
"args": {
"channel": "status_page",
"template_id": "incident_p1_v2",
"audience_segment": "enterprise_active",
"max_recipients": 50000
}
}
],
"aggregate": {
"report_date": "2026-03-06",
"region": "US",
"incident": {
"incident_id": "inc_payments_20260306",
"report_date": "2026-03-06",
"region": "US",
"severity": "P1",
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"affected_checkout_share": 0.27,
"eta_minutes": 45
},
"delivery": {
"channel": "status_page",
"template_id": "incident_p1_v2",
"audience_segment": "enterprise_active",
"queued_recipients": 50000,
"delivery_id": "upd_20260306_001"
}
},
"policy_summary": {
"decisions": {
"allow": 1,
"rewrite": 1,
"deny": 1,
"escalate": 1
},
"denied_tools": [
"export_customer_data"
],
"rewritten_tools": [
"send_status_update"
],
"escalated_tools": [
"send_status_update"
]
},
"trace": [
{
"step": 1,
"action_id": "a1",
"tool": "fetch_incident_snapshot",
"policy_decision": "allow",
"policy_reason": "policy_pass",
"executed_from": "original",
"ok": true
},
{
"step": 2,
"action_id": "a2",
"tool": "export_customer_data",
"policy_decision": "deny",
"policy_reason": "pii_export_blocked",
"executed_from": "none",
"ok": false
},
{
"step": 3,
"action_id": "a3",
"tool": "send_status_update",
"policy_decision": "escalate",
"policy_reason": "mass_external_broadcast",
"executed_from": "human_approved",
"ok": true
},
{
"step": 4,
"action_id": "a4",
"tool": "send_status_update",
"policy_decision": "rewrite",
"policy_reason": "policy_rewrite:template_allowlist,recipient_cap",
"executed_from": "policy_rewrite",
"ok": true
},
{
"step": 5,
"phase": "finalize",
"ok": true
}
],
"history": [{...}]
}
trace enthalt keine Parameter-Diffs; sichere Args stehen in history[*].executed_action.args.
Typische stop_reason
success- Run wurde erfolgreich abgeschlossenmax_seconds- gesamtes Time-Budget aufgebrauchtinvalid_plan:*- ungueltiger Aktionsplaninvalid_action:*- ungueltiger Action-Vertragtool_timeout:*- Tool hat nicht rechtzeitig geantwortettool_error:*- Exception wahrend Tool-Ausfuhrungtool_status_not_ok:*- Tool gabstatus != "ok"zurucktool_invalid_output:*- Tool gab einen ungueltigen Vertrag zurucktool_unmapped:*- Tool nicht in der Dispatch-Map gefundenpolicy_escalation_rejected- human approval hat die Eskalation abgelehntmissing_required_observation:*- kritische Daten fur Finalisierung fehlen
Was Hier NICHT Gezeigt Wird
- persisted policy/event logs (DB/SIEM)
- role/tenant-scoped policy packs
- dynamic risk scoring (beyond static rules)
- async approval queue (Jira/ServiceNow/Slack workflow)
Was Du Als Nachstes Probieren Kannst
send_status_updateausALLOWED_TOOLS_EXECUTIONentfernen und beobachten:policy_decision=deny (tool_denied_execution)-> finalstop_reason=missing_required_observation:send_status_update.- Eine zweite Eskalations-Policy hinzufugen (z. B. fur
eta_minutes > 120) und trace prufen. - Policy-Reasons in strukturierte Codes aufteilen (
policy.mass_broadcast,policy.pii_export) fur einfachere Analytics.