Guarded-Policy Agent на Python: повний приклад

Production-style runnable приклад Guarded-Policy агента на Python з policy-gate allow/deny/rewrite/escalate і прозорими trace/history.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Код
  8. context.py — request envelope
  9. tools.py — виконувані інструменти
  10. agent.py — proposed actions + final answer
  11. gateway.py — policy boundary + execution boundary
  12. main.py — orchestrate policy-gated execution
  13. Приклад виводу
  14. Типові stop_reason
  15. Що тут НЕ показано
  16. Що спробувати далі

Суть патерна (коротко)

Guarded-Policy Agent означає: агент може запропонувати будь-яку дію, але виконання йде тільки через policy boundary.

Перед кожним tool-call policy-gate повертає одне з рішень:

  • allow
  • deny
  • rewrite
  • escalate

Що демонструє цей приклад

  • LLM/agent шар пропонує план дій (actions)
  • execution layer валідовує action-контракт
  • policy boundary застосовує allow/deny/rewrite/escalate
  • runtime execution allowlist є як окремий guard-path (у цьому run не спрацьовує; перевіряється окремим експериментом)
  • escalate: policy формує safe-action, human approval підтверджує виконання
  • фінальна відповідь формується тільки з безпечно отриманих observations

Архітектура

  1. agent.py формує план з чотирьох дій.
  2. gateway.py валідовує кожну дію і приймає policy decision.
  3. a2 блокується як deny (pii_export_blocked) і не виконується.
  4. a3 іде в escalate: policy формує safe-варіант, human approval підтверджує scope, після чого виконується як human_approved.
  5. a4 демонструє rewrite (template/recipient cap) і виконується як policy_rewrite.
  6. main.py збирає trace/history і повертає фінальний brief.

Структура проєкту

TEXT
agent-patterns/
└── guarded-policy-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/guarded-policy-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.


Задача

Продакшен-кейс:

"Підготуй customer-safe update по P1 платіжному інциденту для US. Заборони PII export і обмеж масову комунікацію policy-рівнем."


Код

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
        },
        "policy_hints": {
            "status_update_template": "incident_p1_v2",
            "max_recipients_per_send": 50000,
        },
    }

tools.py — виконувані інструменти

PYTHON
from __future__ import annotations

import time
from typing import Any


def fetch_incident_snapshot(report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    time.sleep(0.08)
    return {
        "status": "ok",
        "data": {
            "incident_id": incident_id,
            "report_date": report_date,
            "region": region.upper(),
            "severity": "P1",
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "affected_checkout_share": 0.27,
            "eta_minutes": 45,
        },
    }


def send_status_update(
    channel: str,
    template_id: str,
    audience_segment: str,
    max_recipients: int,
) -> dict[str, Any]:
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "channel": channel,
            "template_id": template_id,
            "audience_segment": audience_segment,
            "queued_recipients": int(max_recipients),
            "delivery_id": "upd_20260306_001",
        },
    }


def export_customer_data(fields: list[str], destination: str) -> dict[str, Any]:
    del fields, destination
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "export_id": "exp_20260306_001",
            "rows": 18240,
        },
    }


def create_manual_review_ticket(reason: str, payload: dict[str, Any]) -> dict[str, Any]:
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "ticket_id": "pol_20260306_001",
            "reason": reason,
            "payload_keys": sorted(payload.keys()),
        },
    }

agent.py — proposed actions + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_action_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal
    return {
        "actions": [
            {
                "id": "a1",
                "tool": "fetch_incident_snapshot",
                "args": {
                    "report_date": req["report_date"],
                    "region": req["region"],
                    "incident_id": req["incident_id"],
                },
            },
            {
                "id": "a2",
                "tool": "export_customer_data",
                "args": {
                    "fields": ["email", "country", "payment_last4"],
                    "destination": "external_s3",
                },
            },
            {
                "id": "a3",
                "tool": "send_status_update",
                "args": {
                    "channel": "external_email",
                    "template_id": "free_text_v0",
                    "audience_segment": "all_customers",
                    "max_recipients": 120000,
                    "free_text": "We are fully recovered.",
                },
            },
            {
                "id": "a4",
                "tool": "send_status_update",
                "args": {
                    "channel": "status_page",
                    "template_id": "free_text_v0",
                    "audience_segment": "enterprise_active",
                    "max_recipients": 120000,
                },
            },
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    state: dict[str, Any],
    policy_summary: dict[str, Any],
) -> str:
    req = request["request"]
    snap = state.get("snapshot") or {}
    delivery = state.get("delivery") or {}

    blocked = ", ".join(policy_summary.get("denied_tools") or []) or "none"
    sent = ""
    if delivery:
        sent = (
            f" Status update queued via {delivery.get('channel')} for {delivery.get('audience_segment')} "
            f"using template {delivery.get('template_id')} to {delivery.get('queued_recipients')} recipients."
        )

    failed_rate = snap.get("failed_payment_rate")
    failed_rate_str = f"{float(failed_rate) * 100:.1f}%" if isinstance(failed_rate, (int, float)) else "?"
    share = snap.get("affected_checkout_share")
    share_str = f"{float(share) * 100:.0f}%" if isinstance(share, (int, float)) else "?"

    return (
        f"Operations brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} remains "
        f"{snap.get('severity', '?')} with failed payments at {failed_rate_str} and "
        f"{snap.get('chargeback_alerts', '?')} chargeback alerts. Affected checkout share is "
        f"{share_str} and ETA is ~{snap.get('eta_minutes', '?')} minutes "
        f"(estimate, subject to change).{sent} Blocked by policy: {blocked}."
    )

gateway.py — policy boundary + execution boundary

PYTHON
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_actions: int = 8
    action_timeout_seconds: float = 1.2
    max_recipients_per_send: int = 50000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str
    enforced_action: dict[str, Any] | None = None


def _normalize_action(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    tool = raw.get("tool")
    args = raw.get("args")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(tool, str) or not tool.strip():
        raise StopRun("invalid_action:tool")
    if not isinstance(args, dict):
        raise StopRun("invalid_action:args")

    return {
        "id": action_id.strip(),
        "tool": tool.strip(),
        "args": dict(args),
    }


def validate_plan(raw_actions: Any, *, max_actions: int) -> list[dict[str, Any]]:
    if not isinstance(raw_actions, list) or not raw_actions:
        raise StopRun("invalid_plan:actions")
    if len(raw_actions) > max_actions:
        raise StopRun("invalid_plan:too_many_actions")
    return [_normalize_action(item) for item in raw_actions]


def validate_tool_observation(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    if raw.get("status") != "ok":
        raise StopRun(f"tool_status_not_ok:{tool_name}")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return data


class PolicyGateway:
    def __init__(
        self,
        *,
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self.allowed_templates = {"incident_p1_v2", "incident_p2_v1"}
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def evaluate(self, *, action: dict[str, Any], state: dict[str, Any]) -> Decision:
        del state
        normalized = _normalize_action(action)
        tool = normalized["tool"]
        args = dict(normalized["args"])

        if tool not in self.allowed_tools_policy:
            return Decision(kind="deny", reason="tool_denied_policy")
        if tool == "export_customer_data":
            return Decision(kind="deny", reason="pii_export_blocked")
        if tool not in self.allowed_tools_execution:
            return Decision(kind="deny", reason="tool_denied_execution")

        if tool != "send_status_update":
            return Decision(kind="allow", reason="policy_pass")

        rewrite_reasons: list[str] = []
        rewritten = dict(args)

        if rewritten.get("template_id") not in self.allowed_templates:
            rewritten["template_id"] = "incident_p1_v2"
            rewrite_reasons.append("template_allowlist")

        raw_recipients = rewritten.get("max_recipients", self.budget.max_recipients_per_send)
        try:
            recipients = int(raw_recipients)
        except (TypeError, ValueError):
            recipients = self.budget.max_recipients_per_send
        if recipients > self.budget.max_recipients_per_send:
            recipients = self.budget.max_recipients_per_send
            rewrite_reasons.append("recipient_cap")
        rewritten["max_recipients"] = recipients

        if "free_text" in rewritten:
            rewritten.pop("free_text", None)
            rewrite_reasons.append("free_text_removed")

        if (
            rewritten.get("channel") == "external_email"
            and rewritten.get("audience_segment") == "all_customers"
        ):
            rewritten["channel"] = "status_page"
            rewritten["audience_segment"] = "enterprise_active"
            enforced = {
                "id": normalized["id"],
                "tool": normalized["tool"],
                "args": rewritten,
            }
            return Decision(
                kind="escalate",
                reason="mass_external_broadcast",
                enforced_action=enforced,
            )

        if not rewrite_reasons:
            return Decision(kind="allow", reason="policy_pass")

        enforced = {
            "id": normalized["id"],
            "tool": normalized["tool"],
            "args": rewritten,
        }
        return Decision(
            kind="rewrite",
            reason=f"policy_rewrite:{','.join(rewrite_reasons)}",
            enforced_action=enforced,
        )

    def dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.action_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise StopRun(f"tool_timeout:{tool_name}") from exc
        except Exception as exc:  # noqa: BLE001
            raise StopRun(f"tool_error:{tool_name}:{type(exc).__name__}") from exc

        return validate_tool_observation(raw, tool_name=tool_name)

main.py — orchestrate policy-gated execution

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_action_plan
from context import build_request
from gateway import Budget, PolicyGateway, StopRun, validate_plan
from tools import (
    create_manual_review_ticket,
    export_customer_data,
    fetch_incident_snapshot,
    send_status_update,
)

GOAL = (
    "Prepare a customer-safe operations update for a US payments incident. "
    "Use policy-gated execution and never expose customer PII."
)
REQUEST = build_request(
    report_date="2026-03-06",
    region="US",
    incident_id="inc_payments_20260306",
)

BUDGET = Budget(
    max_seconds=25,
    max_actions=8,
    action_timeout_seconds=1.2,
    max_recipients_per_send=50000,
)

ALLOWED_TOOLS_POLICY = {
    "fetch_incident_snapshot",
    "send_status_update",
    "export_customer_data",
    "create_manual_review_ticket",
}
ALLOWED_TOOLS_EXECUTION = {
    "fetch_incident_snapshot",
    "send_status_update",
    "create_manual_review_ticket",
}

TOOLS: dict[str, Any] = {
    "fetch_incident_snapshot": fetch_incident_snapshot,
    "send_status_update": send_status_update,
    "export_customer_data": export_customer_data,
    "create_manual_review_ticket": create_manual_review_ticket,
}


def simulate_human_approval(*, action: dict[str, Any], reason: str) -> dict[str, Any]:
    # Demo policy: approve only the broadcast-risk escalation after safe rewrite.
    if reason == "mass_external_broadcast":
        return {"approved": True, "action": action, "reason": "approved_with_safe_scope"}
    return {"approved": False, "action": action, "reason": "human_rejected"}


def run_guarded_policy_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []
    state: dict[str, Any] = {"snapshot": None, "delivery": None}

    gateway = PolicyGateway(
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    decision_counts = {"allow": 0, "rewrite": 0, "deny": 0, "escalate": 0}
    denied_tools: list[str] = []
    rewritten_tools: list[str] = []
    escalated_tools: list[str] = []

    phase = "plan"

    try:
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="plan")

        raw_plan = propose_action_plan(goal=goal, request=request)
        actions = validate_plan(raw_plan.get("actions"), max_actions=BUDGET.max_actions)
        phase = "execute"

        for idx, action in enumerate(actions, start=1):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="execute")

            decision = gateway.evaluate(action=action, state=state)
            decision_counts[decision.kind] += 1

            trace_item = {
                "step": idx,
                "action_id": action["id"],
                "tool": action["tool"],
                "policy_decision": decision.kind,
                "policy_reason": decision.reason,
                "executed_from": "none",
                "ok": False,
            }
            history_item: dict[str, Any] = {
                "step": idx,
                "proposed_action": action,
                "policy": {"decision": decision.kind, "reason": decision.reason},
            }

            if decision.kind == "deny":
                denied_tools.append(action["tool"])
                trace.append(trace_item)
                history.append(history_item)
                continue

            executed_action = action
            executed_from = "original"
            if decision.kind == "rewrite":
                executed_action = decision.enforced_action or action
                rewritten_tools.append(action["tool"])
                executed_from = "policy_rewrite"
            elif decision.kind == "escalate":
                escalated_tools.append(action["tool"])
                escalation_ticket = gateway.dispatch(
                    tool_name="create_manual_review_ticket",
                    tool_fn=TOOLS["create_manual_review_ticket"],
                    args={"reason": decision.reason, "payload": (decision.enforced_action or action)},
                )
                history_item["escalation_ticket"] = escalation_ticket
                human = simulate_human_approval(
                    action=(decision.enforced_action or action),
                    reason=decision.reason,
                )
                if not human["approved"]:
                    return stopped("policy_escalation_rejected", phase="execute")
                executed_action = human["action"]
                executed_from = "human_approved"
                history_item["human"] = {
                    "approved": True,
                    "reason": human["reason"],
                }

            tool_name = executed_action["tool"]
            tool_fn = TOOLS.get(tool_name)
            if tool_fn is None:
                return stopped(f"tool_unmapped:{tool_name}", phase="execute")

            # Keep the decision in trace even if execution fails later.
            trace.append(trace_item)
            observation = gateway.dispatch(
                tool_name=tool_name,
                tool_fn=tool_fn,
                args=executed_action["args"],
            )
            if tool_name == "fetch_incident_snapshot":
                state["snapshot"] = observation
            elif tool_name == "send_status_update":
                state["delivery"] = observation

            trace_item["executed_from"] = executed_from
            trace_item["ok"] = True

            history_item["executed_action"] = executed_action
            history_item["executed_from"] = executed_from
            history_item["observation"] = observation
            history.append(history_item)

        if not isinstance(state["snapshot"], dict):
            return stopped("missing_required_observation:snapshot", phase="finalize")
        if not isinstance(state["delivery"], dict):
            return stopped("missing_required_observation:send_status_update", phase="finalize")

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident": state["snapshot"],
            "delivery": state["delivery"],
        }
        policy_summary = {
            "decisions": decision_counts,
            "denied_tools": sorted(set(denied_tools)),
            "rewritten_tools": sorted(set(rewritten_tools)),
            "escalated_tools": sorted(set(escalated_tools)),
        }
        answer = compose_final_answer(
            request=request,
            state=state,
            policy_summary=policy_summary,
        )

        trace.append(
            {
                "step": len(actions) + 1,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append({"step": len(actions) + 1, "action": "finalize"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "policy_guarded_success",
            "answer": answer,
            "proposed_plan": actions,
            "executed_plan": [
                step["executed_action"]
                for step in history
                if isinstance(step, dict) and isinstance(step.get("executed_action"), dict)
            ],
            "aggregate": aggregate,
            "policy_summary": policy_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)
    finally:
        gateway.close()


def main() -> None:
    result = run_guarded_policy_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • agent пропонує дії, але виконання не має прямого доступу до tools
  • pii_export_blocked показує policy-level deny для ризикового експорту
  • tool_denied_execution лишається доступним runtime deny-path, але в цьому run не спрацьовує
  • escalate: policy формує safe-версію дії, human approval підтверджує scope, далі виконується як human_approved
  • trace/history роблять policy-рішення аудитабельними
  • gateway.close() викликається в finally, щоб не лишати pool відкритим

Приклад виводу

JSON
{
  "run_id": "62537585-a465-4b35-bbd8-40153df722eb",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "policy_guarded_success",
  "answer": "Operations brief (US, 2026-03-06): incident inc_payments_20260306 remains P1 with failed payments at 3.4% and 5 chargeback alerts. Affected checkout share is 27% and ETA is ~45 minutes (estimate, subject to change). Status update queued via status_page for enterprise_active using template incident_p1_v2 to 50000 recipients. Blocked by policy: export_customer_data.",
  "proposed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a2",
      "tool": "export_customer_data",
      "args": {
        "fields": ["email", "country", "payment_last4"],
        "destination": "external_s3"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "external_email",
        "template_id": "free_text_v0",
        "audience_segment": "all_customers",
        "max_recipients": 120000,
        "free_text": "We are fully recovered."
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "free_text_v0",
        "audience_segment": "enterprise_active",
        "max_recipients": 120000
      }
    }
  ],
  "executed_plan": [
    {
      "id": "a1",
      "tool": "fetch_incident_snapshot",
      "args": {
        "report_date": "2026-03-06",
        "region": "US",
        "incident_id": "inc_payments_20260306"
      }
    },
    {
      "id": "a3",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    },
    {
      "id": "a4",
      "tool": "send_status_update",
      "args": {
        "channel": "status_page",
        "template_id": "incident_p1_v2",
        "audience_segment": "enterprise_active",
        "max_recipients": 50000
      }
    }
  ],
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "incident": {
      "incident_id": "inc_payments_20260306",
      "report_date": "2026-03-06",
      "region": "US",
      "severity": "P1",
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "affected_checkout_share": 0.27,
      "eta_minutes": 45
    },
    "delivery": {
      "channel": "status_page",
      "template_id": "incident_p1_v2",
      "audience_segment": "enterprise_active",
      "queued_recipients": 50000,
      "delivery_id": "upd_20260306_001"
    }
  },
  "policy_summary": {
    "decisions": {
      "allow": 1,
      "rewrite": 1,
      "deny": 1,
      "escalate": 1
    },
    "denied_tools": [
      "export_customer_data"
    ],
    "rewritten_tools": [
      "send_status_update"
    ],
    "escalated_tools": [
      "send_status_update"
    ]
  },
  "trace": [
    {
      "step": 1,
      "action_id": "a1",
      "tool": "fetch_incident_snapshot",
      "policy_decision": "allow",
      "policy_reason": "policy_pass",
      "executed_from": "original",
      "ok": true
    },
    {
      "step": 2,
      "action_id": "a2",
      "tool": "export_customer_data",
      "policy_decision": "deny",
      "policy_reason": "pii_export_blocked",
      "executed_from": "none",
      "ok": false
    },
    {
      "step": 3,
      "action_id": "a3",
      "tool": "send_status_update",
      "policy_decision": "escalate",
      "policy_reason": "mass_external_broadcast",
      "executed_from": "human_approved",
      "ok": true
    },
    {
      "step": 4,
      "action_id": "a4",
      "tool": "send_status_update",
      "policy_decision": "rewrite",
      "policy_reason": "policy_rewrite:template_allowlist,recipient_cap",
      "executed_from": "policy_rewrite",
      "ok": true
    },
    {
      "step": 5,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

trace не містить diff параметрів; safe-версії args дивись у history[*].executed_action.args.


Типові stop_reason

  • success — run завершено коректно
  • max_seconds — вичерпано загальний time budget
  • invalid_plan:* — невалідний план дій
  • invalid_action:* — невалідний action-контракт
  • tool_timeout:* — tool не відповів вчасно
  • tool_error:* — exception у tool execution
  • tool_status_not_ok:* — tool повернув status != "ok"
  • tool_invalid_output:* — tool повернув невалідний контракт
  • tool_unmapped:* — tool не знайдений у dispatch map
  • policy_escalation_rejected — human approval відхилив ескалацію
  • missing_required_observation:* — бракує критичних даних для фіналізації

Що тут НЕ показано

  • persisted policy/event logs (DB/SIEM)
  • role/tenant-scoped policy packs
  • dynamic risk scoring (beyond static rules)
  • async approval queue (Jira/ServiceNow/Slack workflow)

Що спробувати далі

  1. Прибрати send_status_update з ALLOWED_TOOLS_EXECUTION і подивитись: policy_decision=deny (tool_denied_execution) → фінальний stop_reason=missing_required_observation:send_status_update.
  2. Додати другу escalation-політику (наприклад, для eta_minutes > 120) і перевірити trace.
  3. Розділити policy reasons на structured codes (policy.mass_broadcast, policy.pii_export) для зручної аналітики.
⏱️ 12 хв читанняОновлено 6 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.