Code-Execution Agent на Python: повний приклад

Production-style runnable приклад Code-Execution агента на Python з policy-check, separate subprocess execution (not a security sandbox), валідацією результату і прозорими trace/history.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Що ти побачиш при запуску
  5. Структура проєкту
  6. Як запустити
  7. Задача
  8. Код
  9. context.py — request envelope
  10. agent.py — proposed action + final answer
  11. gateway.py — policy + execution + output validation
  12. main.py — orchestrate code-execution flow
  13. Приклад виводу
  14. Типові stop_reason
  15. Що тут НЕ показано
  16. Що спробувати далі

Суть патерна (коротко)

Code-Execution Agent означає: агент може запропонувати код, але виконання відбувається тільки через execution boundary з політиками та лімітами.

Перед запуском коду система робить:

  • action contract validation
  • policy check (мова, імпорти, заборонені виклики)
  • separate subprocess execution (best-effort, not a security sandbox) з timeout/output limits
  • output validation за контрактом

Що демонструє цей приклад

  • agent формує proposed_action із кодом і payload
  • runtime бере allowed_languages/max_code_chars/timeout з policy_hints (з fallback defaults)
  • network_access=denied у цьому демо є contract-level вимогою (реальне network enforcement потребує container/jail)
  • policy allowlist і execution allowlist розділені (python/javascript vs фактичне python)
  • статична перевірка коду блокує unsafe imports/calls і небезпечні indirection-примітиви (getattr) + будь-які dunder references (__*)
  • static policy блокує лише очевидні URL literals (http://, https://) і не є network enforcement
  • DENIED_GLOBAL_NAMES застосовується тільки до untrusted generated code; host-side gateway код може імпортувати os/subprocess/sys для виконання boundary
  • код виконується в окремому subprocess boundary (best-effort, not a security sandbox) із timeout
  • output caps у демо перевіряються post-factum після завершення процесу
  • output валідується схемою перед формуванням бізнес-відповіді
  • trace/history дають аудит повного циклу plan -> policy -> execute -> finalize

Архітектура

  1. agent.py генерує action із Python-кодом для розрахунку інцидентних метрик.
  2. gateway.py валідовує action-контракт і policy-check.
  3. Якщо policy pass: execution layer запускає код в separate subprocess boundary (best-effort, not a security sandbox) з лімітами.
  4. Результат парситься як JSON і перевіряється за required schema.
  5. main.py збирає aggregate, trace/history і повертає operations brief.

Що ти побачиш при запуску

  1. step 1: code plan (code_hash, chars)
  2. step 2: policy decision (allow/deny + reason)
  3. step 3: execution metrics (exec_ms, stdout_bytes, stderr_bytes)
  4. step 4: validated metrics -> final brief

Структура проєкту

TEXT
agent-patterns/
└── code-execution-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/code-execution-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.


Задача

Продакшен-кейс:

"Безпечно виконати код для розрахунку health-метрик платіжного інциденту і повернути brief для operations."


Код

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    transactions: list[dict[str, Any]] = []
    for idx in range(60):
        is_failed = idx in {7, 34}
        transactions.append(
            {
                "transaction_id": f"txn_{idx + 1:03d}",
                "status": "failed" if is_failed else "paid",
                "chargeback": idx == 34,
                "latency_ms": 150 + (idx % 40) + (25 if is_failed else 0),
            }
        )

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
            "transactions": transactions,
        },
        "policy_hints": {
            "allowed_languages": ["python", "javascript"],
            "max_code_chars": 2400,
            "exec_timeout_seconds": 2.0,
            "network_access": "denied",
        },
    }

agent.py — proposed action + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_code_execution_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal

    code = """
import json
import statistics

payload = json.loads(input())
rows = payload["transactions"]

total = len(rows)
failed = sum(1 for row in rows if row["status"] != "paid")
chargeback_alerts = sum(1 for row in rows if row.get("chargeback") is True)
failed_rate = (failed / total) if total else 0.0

latencies = [float(row["latency_ms"]) for row in rows]
avg_latency = statistics.fmean(latencies) if latencies else 0.0

if latencies:
    sorted_latencies = sorted(latencies)
    p95_idx = int(round((len(sorted_latencies) - 1) * 0.95))
    p95_latency = sorted_latencies[p95_idx]
else:
    p95_latency = 0.0

severity = "P1" if failed_rate >= 0.03 else "P2"
eta_minutes = 45 if severity == "P1" else 20

print(
    json.dumps(
        {
            "failed_payment_rate": failed_rate,
            "chargeback_alerts": chargeback_alerts,
            "incident_severity": severity,
            "eta_minutes": eta_minutes,
            "affected_checkout_share": failed_rate,
            "avg_latency_ms": avg_latency,
            "p95_latency_ms": p95_latency,
            "sample_size": total,
            "incident_id": payload["incident_id"],
            "region": payload["region"],
        },
        separators=(",", ":"),
    )
)
""".strip()

    return {
        "action": {
            "id": "c1",
            "language": "python",
            "entrypoint": "main.py",
            "code": code,
            "input_payload": {
                "incident_id": req["incident_id"],
                "region": req["region"],
                "transactions": req["transactions"],
            },
        }
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    execution_summary: dict[str, Any],
) -> str:
    req = request["request"]
    metrics = aggregate["metrics"]

    return (
        f"Code execution brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} is "
        f"{metrics['incident_severity']} with failed payments at {metrics['failed_payment_rate_pct']}% and "
        f"{metrics['chargeback_alerts']} chargeback alerts. Affected checkout share is "
        f"{metrics['affected_checkout_share_pct']}%, average latency is {metrics['avg_latency_ms']} ms "
        f"(p95 {metrics['p95_latency_ms']} ms), and ETA is ~{metrics['eta_minutes']} minutes. "
        f"Executed in a separate subprocess boundary (best-effort, not a security sandbox) "
        f"({execution_summary['exec_ms']} ms, {execution_summary['stdout_bytes']} stdout bytes, "
        f"{execution_summary['stderr_bytes']} stderr bytes)."
    )

gateway.py — policy + execution + output validation

PYTHON
from __future__ import annotations

import ast
import hashlib
import json
import os
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_code_chars: int = 2400
    exec_timeout_seconds: float = 2.0
    max_stdout_bytes: int = 4096
    max_stderr_bytes: int = 4096


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


ALLOWED_IMPORTS = {"json", "statistics"}
# Blocked function calls in generated code.
DENIED_CALL_NAMES = {"exec", "eval", "compile", "__import__", "open"}
# NOTE: heuristic hardening — blocks suspicious attribute names regardless of receiver type.
DENIED_CALL_ATTRS = {"system", "popen", "fork", "spawn", "connect", "request", "urlopen"}
# Blocked global name references in generated code (introspection/indirection primitives).
DENIED_NAME_REFERENCES = {
    "builtins",
    "getattr",
    "setattr",
    "delattr",
    "globals",
    "locals",
    "vars",
    "dir",
}
# Blocked identifier references in generated code (belt-and-suspenders hardening).
DENIED_GLOBAL_NAMES = {"os", "sys", "subprocess", "socket", "pathlib", "importlib"}


def code_hash(code: str) -> str:
    return hashlib.sha256(code.encode("utf-8")).hexdigest()[:16]


def validate_code_action(raw: Any, *, max_code_chars: int) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    language = raw.get("language")
    entrypoint = raw.get("entrypoint")
    code = raw.get("code")
    input_payload = raw.get("input_payload")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(language, str) or not language.strip():
        raise StopRun("invalid_action:language")
    if not isinstance(entrypoint, str) or not entrypoint.strip():
        raise StopRun("invalid_action:entrypoint")
    if not isinstance(code, str) or not code.strip():
        raise StopRun("invalid_action:code")
    if len(code) > max_code_chars:
        raise StopRun("invalid_action:code_too_long")
    if not isinstance(input_payload, dict):
        raise StopRun("invalid_action:input_payload")

    normalized_entrypoint = entrypoint.strip()
    if "/" in normalized_entrypoint or "\\" in normalized_entrypoint:
        raise StopRun("invalid_action:entrypoint_path")
    if normalized_entrypoint != "main.py":
        raise StopRun("invalid_action:entrypoint_denied")

    return {
        "id": action_id.strip(),
        "language": language.strip().lower(),
        "entrypoint": normalized_entrypoint,
        "code": code,
        "input_payload": input_payload,
    }


def validate_execution_output(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_code_output:not_object")

    required = {
        "failed_payment_rate",
        "chargeback_alerts",
        "incident_severity",
        "eta_minutes",
        "affected_checkout_share",
        "avg_latency_ms",
        "p95_latency_ms",
        "sample_size",
        "incident_id",
        "region",
    }
    if not required.issubset(set(raw.keys())):
        raise StopRun("invalid_code_output:missing_required")

    failed_rate = raw["failed_payment_rate"]
    if not isinstance(failed_rate, (int, float)) or not (0 <= float(failed_rate) <= 1):
        raise StopRun("invalid_code_output:failed_payment_rate")

    share = raw["affected_checkout_share"]
    if not isinstance(share, (int, float)) or not (0 <= float(share) <= 1):
        raise StopRun("invalid_code_output:affected_checkout_share")

    chargeback_alerts = raw["chargeback_alerts"]
    if not isinstance(chargeback_alerts, int) or chargeback_alerts < 0:
        raise StopRun("invalid_code_output:chargeback_alerts")

    severity = raw["incident_severity"]
    if severity not in {"P1", "P2", "P3"}:
        raise StopRun("invalid_code_output:incident_severity")

    eta = raw["eta_minutes"]
    if not isinstance(eta, int) or eta < 0 or eta > 240:
        raise StopRun("invalid_code_output:eta_minutes")

    sample_size = raw["sample_size"]
    if not isinstance(sample_size, int) or sample_size <= 0:
        raise StopRun("invalid_code_output:sample_size")

    try:
        avg_latency = round(float(raw["avg_latency_ms"]), 2)
    except (TypeError, ValueError):
        raise StopRun("invalid_code_output:avg_latency_ms")
    try:
        p95_latency = round(float(raw["p95_latency_ms"]), 2)
    except (TypeError, ValueError):
        raise StopRun("invalid_code_output:p95_latency_ms")

    return {
        "failed_payment_rate": float(failed_rate),
        "chargeback_alerts": chargeback_alerts,
        "incident_severity": severity,
        "eta_minutes": eta,
        "affected_checkout_share": float(share),
        "avg_latency_ms": avg_latency,
        "p95_latency_ms": p95_latency,
        "sample_size": sample_size,
        "incident_id": str(raw["incident_id"]),
        "region": str(raw["region"]).upper(),
    }


def _static_policy_violations(code: str) -> list[str]:
    lower = code.lower()
    violations: list[str] = []
    input_calls = 0

    if "http://" in lower or "https://" in lower:
        violations.append("network_literal_blocked")

    try:
        tree = ast.parse(code)
    except SyntaxError:
        return ["syntax_error"]

    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                module = alias.name.split(".")[0]
                if module not in ALLOWED_IMPORTS:
                    violations.append(f"import_not_allowed:{module}")
        elif isinstance(node, ast.ImportFrom):
            module = (node.module or "").split(".")[0]
            if module not in ALLOWED_IMPORTS:
                violations.append(f"import_not_allowed:{module or 'relative'}")
        elif isinstance(node, ast.Name):
            if node.id.startswith("__"):
                violations.append(f"name_not_allowed:{node.id}")
            elif node.id in DENIED_NAME_REFERENCES:
                violations.append(f"name_not_allowed:{node.id}")
            elif node.id in DENIED_GLOBAL_NAMES:
                violations.append(f"name_not_allowed:{node.id}")
        elif isinstance(node, ast.Attribute):
            if node.attr.startswith("__"):
                violations.append(f"attr_not_allowed:{node.attr}")
            elif node.attr in DENIED_CALL_ATTRS:
                violations.append(f"attr_not_allowed:{node.attr}")
        elif isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name) and node.func.id in DENIED_CALL_NAMES:
                violations.append(f"call_not_allowed:{node.func.id}")
            elif isinstance(node.func, ast.Name) and node.func.id in {"getattr", "setattr", "delattr"}:
                violations.append(f"call_not_allowed:{node.func.id}")
            elif isinstance(node.func, ast.Name) and node.func.id == "input":
                input_calls += 1
            elif isinstance(node.func, ast.Attribute) and node.func.attr in DENIED_CALL_ATTRS:
                violations.append(f"call_not_allowed:{node.func.attr}")

    if input_calls > 1:
        violations.append("too_many_input_reads")

    return sorted(set(violations))


class CodeExecutionGateway:
    def __init__(
        self,
        *,
        allowed_languages_policy: set[str],
        allowed_languages_execution: set[str],
        budget: Budget,
    ):
        self.allowed_languages_policy = {item.lower() for item in allowed_languages_policy}
        self.allowed_languages_execution = {item.lower() for item in allowed_languages_execution}
        self.budget = budget

    def evaluate(self, *, action: dict[str, Any]) -> Decision:
        language = action["language"]
        if language not in self.allowed_languages_policy:
            return Decision(kind="deny", reason="language_denied_policy")
        if language not in self.allowed_languages_execution:
            return Decision(kind="deny", reason="language_denied_execution")

        violations = _static_policy_violations(action["code"])
        if violations:
            return Decision(kind="deny", reason=f"static_policy_violation:{','.join(violations[:3])}")
        return Decision(kind="allow", reason="policy_pass")

    def execute_python(self, *, code: str, entrypoint: str, input_payload: dict[str, Any]) -> dict[str, Any]:
        if entrypoint != "main.py":
            raise StopRun("invalid_action:entrypoint_denied")

        with tempfile.TemporaryDirectory(prefix="code_exec_agent_") as temp_dir:
            script_path = Path(temp_dir) / entrypoint
            script_path.write_text(code, encoding="utf-8")

            # Minimal hardening for interpreter behavior in this demo boundary.
            proc_env = os.environ.copy()
            proc_env["PYTHONNOUSERSITE"] = "1"
            proc_env["PYTHONDONTWRITEBYTECODE"] = "1"

            started = time.monotonic()
            try:
                proc = subprocess.run(
                    [sys.executable, str(script_path)],
                    input=json.dumps(input_payload),
                    text=True,
                    encoding="utf-8",
                    errors="replace",
                    capture_output=True,
                    timeout=self.budget.exec_timeout_seconds,
                    cwd=temp_dir,
                    env=proc_env,
                )
            except subprocess.TimeoutExpired as exc:
                raise StopRun("code_timeout") from exc

            exec_ms = int((time.monotonic() - started) * 1000)
            stdout = proc.stdout or ""
            stderr = proc.stderr or ""
            # Demo limitation: size caps are checked after process completion.
            stdout_bytes = len(stdout.encode("utf-8"))
            stderr_bytes = len(stderr.encode("utf-8"))
            if stdout_bytes > self.budget.max_stdout_bytes:
                raise StopRun("code_output_too_large")
            if stderr_bytes > self.budget.max_stderr_bytes:
                raise StopRun("code_stderr_too_large")
            if proc.returncode != 0:
                stderr_snippet = stderr.strip().replace("\n", " ")[:200]
                stdout_snippet = stdout.strip().replace("\n", " ")[:200]
                details: dict[str, str] = {}
                if stderr_snippet:
                    details["stderr_snippet"] = stderr_snippet
                if stdout_snippet:
                    details["stdout_snippet"] = stdout_snippet
                raise StopRun(
                    f"code_runtime_error:{proc.returncode}",
                    details=details,
                )

            stdout = stdout.strip()
            if not stdout:
                raise StopRun("invalid_code_output:empty_stdout")

            try:
                payload = json.loads(stdout)
            except json.JSONDecodeError as exc:
                raise StopRun("invalid_code_output:non_json") from exc

            if not isinstance(payload, dict):
                raise StopRun("invalid_code_output:not_object")

            return {
                "payload": payload,
                "exec_ms": exec_ms,
                "stdout_bytes": stdout_bytes,
                "stderr_bytes": stderr_bytes,
            }

main.py — orchestrate code-execution flow

PYTHON
from __future__ import annotations

import json
import math
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_code_execution_plan
from context import build_request
from gateway import (
    Budget,
    CodeExecutionGateway,
    StopRun,
    code_hash,
    validate_code_action,
    validate_execution_output,
)

GOAL = (
    "Run a safe code task to compute incident metrics from payment transactions "
    "and return an operations-ready summary."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    incident_id="inc_payments_20260307",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_code_chars=2400,
    exec_timeout_seconds=2.0,
    max_stdout_bytes=4096,
    max_stderr_bytes=4096,
)

DEFAULT_ALLOWED_LANGUAGES_POLICY = {"python", "javascript"}
ALLOWED_LANGUAGES_EXECUTION = {"python"}


def run_code_execution_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}
    network_access = str(hints.get("network_access", "denied")).strip().lower()
    if network_access not in {"denied", "none", "off"}:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:network_access_must_be_denied",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    allowed_policy_raw = hints.get("allowed_languages")
    if isinstance(allowed_policy_raw, list):
        allowed_policy = {
            str(item).strip().lower()
            for item in allowed_policy_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_policy = set(DEFAULT_ALLOWED_LANGUAGES_POLICY)
    if not allowed_policy:
        allowed_policy = set(DEFAULT_ALLOWED_LANGUAGES_POLICY)

    max_code_chars_raw = hints.get("max_code_chars", DEFAULT_BUDGET.max_code_chars)
    exec_timeout_raw = hints.get("exec_timeout_seconds", DEFAULT_BUDGET.exec_timeout_seconds)
    try:
        max_code_chars = int(max_code_chars_raw)
    except (TypeError, ValueError):
        max_code_chars = DEFAULT_BUDGET.max_code_chars
    try:
        exec_timeout_seconds = float(exec_timeout_raw)
        if not math.isfinite(exec_timeout_seconds):
            raise ValueError
    except (TypeError, ValueError):
        exec_timeout_seconds = DEFAULT_BUDGET.exec_timeout_seconds

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_code_chars=max(200, min(8000, max_code_chars)),
        exec_timeout_seconds=max(0.2, min(20.0, exec_timeout_seconds)),
        max_stdout_bytes=DEFAULT_BUDGET.max_stdout_bytes,
        max_stderr_bytes=DEFAULT_BUDGET.max_stderr_bytes,
    )

    gateway = CodeExecutionGateway(
        allowed_languages_policy=allowed_policy,
        allowed_languages_execution=ALLOWED_LANGUAGES_EXECUTION,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_code_execution_plan(goal=goal, request=request)
        action = validate_code_action(raw_plan.get("action"), max_code_chars=budget.max_code_chars)
        generated_code_hash = code_hash(action["code"])

        trace.append(
            {
                "step": 1,
                "phase": "plan_code",
                "action_id": action["id"],
                "language": action["language"],
                "code_hash": generated_code_hash,
                "chars": len(action["code"]),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_code_execution_plan",
                "proposed_action": {
                    "id": action["id"],
                    "language": action["language"],
                    "entrypoint": action["entrypoint"],
                    "code_hash": generated_code_hash,
                },
            }
        )

        phase = "policy_check"
        decision = gateway.evaluate(action=action)
        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_languages_policy": sorted(allowed_policy),
                "allowed_languages_execution": sorted(ALLOWED_LANGUAGES_EXECUTION),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {
                    "kind": decision.kind,
                    "reason": decision.reason,
                },
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="execute")

        phase = "execute"
        execute_trace = {
            "step": 3,
            "phase": "execute_code",
            "language": action["language"],
            "code_hash": generated_code_hash,
            "ok": False,
        }
        trace.append(execute_trace)
        try:
            execution = gateway.execute_python(
                code=action["code"],
                entrypoint=action["entrypoint"],
                input_payload=action["input_payload"],
            )
            validated = validate_execution_output(execution["payload"])
        except StopRun as exc:
            execute_trace["error"] = exc.reason
            details = exc.details if isinstance(exc.details, dict) else {}
            stderr_snippet = str(details.get("stderr_snippet", "")).strip()
            stdout_snippet = str(details.get("stdout_snippet", "")).strip()
            if stderr_snippet:
                execute_trace["stderr_snippet"] = stderr_snippet
            if stdout_snippet:
                execute_trace["stdout_snippet"] = stdout_snippet
            history.append(
                {
                    "step": 3,
                    "action": "execute_code",
                    "status": "error",
                    "reason": exc.reason,
                    **({"stderr_snippet": stderr_snippet} if stderr_snippet else {}),
                    **({"stdout_snippet": stdout_snippet} if stdout_snippet else {}),
                }
            )
            raise

        execute_trace["stdout_bytes"] = execution["stdout_bytes"]
        execute_trace["stderr_bytes"] = execution["stderr_bytes"]
        execute_trace["exec_ms"] = execution["exec_ms"]
        execute_trace["ok"] = True
        history.append(
            {
                "step": 3,
                "action": "execute_code",
                "result": validated,
            }
        )

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident_id": request["request"]["incident_id"],
            "metrics": {
                "incident_severity": validated["incident_severity"],
                "failed_payment_rate": round(validated["failed_payment_rate"], 6),
                "failed_payment_rate_pct": round(validated["failed_payment_rate"] * 100, 2),
                "chargeback_alerts": validated["chargeback_alerts"],
                "eta_minutes": validated["eta_minutes"],
                "affected_checkout_share": round(validated["affected_checkout_share"], 6),
                "affected_checkout_share_pct": round(validated["affected_checkout_share"] * 100, 2),
                "avg_latency_ms": validated["avg_latency_ms"],
                "p95_latency_ms": validated["p95_latency_ms"],
                "sample_size": validated["sample_size"],
            },
        }
        execution_summary = {
            "language": action["language"],
            "code_hash": generated_code_hash,
            "exec_ms": execution["exec_ms"],
            "stdout_bytes": execution["stdout_bytes"],
            "stderr_bytes": execution["stderr_bytes"],
        }
        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            execution_summary=execution_summary,
        )

        trace.append(
            {
                "step": 4,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "code_execution_success",
            "answer": answer,
            "proposed_action": {
                "id": action["id"],
                "language": action["language"],
                "entrypoint": action["entrypoint"],
                "code_hash": generated_code_hash,
            },
            "aggregate": aggregate,
            "execution": execution_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)


def main() -> None:
    result = run_code_execution_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • agent може запропонувати код, але execution boundary вирішує, чи його взагалі можна запускати
  • policy check блокує unsafe код до виконання
  • policy також блокує будь-які dunder references (__*) як demo hardening
  • запуск іде в separate subprocess boundary (best-effort, not a security sandbox) із timeout та output limit
  • output не довіряється "як є": він проходить schema validation
  • бізнес-відповідь формується тільки з validated output

Приклад виводу

JSON
{
  "run_id": "5506a01d-01a1-47e8-88c9-24eb0d1dff39",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "code_execution_success",
  "answer": "Code execution brief (US, 2026-03-07): incident inc_payments_20260307 is P1 with failed payments at 3.33% and 1 chargeback alerts. Affected checkout share is 3.33%, average latency is 167.0 ms (p95 187.0 ms), and ETA is ~45 minutes. Executed in a separate subprocess boundary (best-effort, not a security sandbox) (25 ms, 269 stdout bytes, 0 stderr bytes).",
  "proposed_action": {
    "id": "c1",
    "language": "python",
    "entrypoint": "main.py",
    "code_hash": "47a5c01dd74664c7"
  },
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "incident_id": "inc_payments_20260307",
    "metrics": {
      "incident_severity": "P1",
      "failed_payment_rate": 0.033333,
      "failed_payment_rate_pct": 3.33,
      "chargeback_alerts": 1,
      "eta_minutes": 45,
      "affected_checkout_share": 0.033333,
      "affected_checkout_share_pct": 3.33,
      "avg_latency_ms": 167.0,
      "p95_latency_ms": 187.0,
      "sample_size": 60
    }
  },
  "execution": {
    "language": "python",
    "code_hash": "47a5c01dd74664c7",
    "exec_ms": 25,
    "stdout_bytes": 269,
    "stderr_bytes": 0
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan_code",
      "action_id": "c1",
      "language": "python",
      "code_hash": "47a5c01dd74664c7",
      "chars": 1229,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_languages_policy": ["javascript", "python"],
      "allowed_languages_execution": ["python"],
      "ok": true
    },
    {
      "step": 3,
      "phase": "execute_code",
      "language": "python",
      "code_hash": "47a5c01dd74664c7",
      "ok": true,
      "stdout_bytes": 269,
      "stderr_bytes": 0,
      "exec_ms": 25
    },
    {
      "step": 4,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • max_seconds — вичерпано загальний time budget
  • invalid_action:* — невалідний action-контракт
  • invalid_action:entrypoint_path — entrypoint містить / або \ (path traversal attempt)
  • invalid_action:entrypoint_denied — entrypoint не входить в allowlist (дозволено лише main.py)
  • policy_block:language_denied_policy — мова не дозволена policy allowlist
  • policy_block:language_denied_execution — мова не дозволена execution allowlist
  • policy_block:static_policy_violation:* — статична перевірка коду виявила заборонену операцію
  • code_timeout — виконання коду перевищило timeout
  • code_runtime_error:* — код завершився помилкою runtime
  • code_output_too_large — stdout перевищив allowed output budget
  • code_stderr_too_large — stderr перевищив allowed output budget
  • invalid_code_output:* — результат коду не пройшов schema validation
  • invalid_request:network_access_must_be_denied — приклад приймає тільки network_access=denied

Що тут НЕ показано

  • container-level ізоляція (seccomp/cgroup/jail)
  • filesystem / cpu / memory isolation (цей приклад не є security sandbox)
  • real network isolation at runtime (тут network_access=denied лише contract-level check)
  • streaming caps із kill-on-limit для stdout/stderr (у демо cap перевіряється post-factum після завершення процесу)
  • artifact storage для generated code/result snapshots
  • multi-step repair loop (повторна генерація коду після помилки)
  • human approval для risky execution plans
  • full DoS mitigation (file-spam / heavy allocations / algorithmic bombs)

Що спробувати далі

  1. Замінити language на javascript і подивитись policy_block:language_denied_execution.
  2. Додати в код import os і подивитись policy_block:static_policy_violation:import_not_allowed:os.
  3. Додати в код while True: pass і подивитись code_timeout.
  4. Повернути "sample_size": 0 у результаті скрипта і подивитись invalid_code_output:sample_size.
  5. Повернути "incident_severity": "P0" у результаті скрипта і подивитись invalid_code_output:incident_severity.
⏱️ 15 хв читанняОновлено 6 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.