Code-Execution Agent in Python: Vollständiges Beispiel

Production-style runnable Beispiel eines Code-Execution Agents in Python mit policy-check, separate subprocess execution (not a security sandbox), Ergebnisvalidierung und transparentem trace/history.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was Dieses Beispiel Zeigt
  3. Architektur
  4. Was Du Beim Start Siehst
  5. Projektstruktur
  6. Ausführen
  7. Aufgabe
  8. Code
  9. context.py — request envelope
  10. agent.py — proposed action + final answer
  11. gateway.py — policy + execution + output validation
  12. main.py — orchestrate code-execution flow
  13. Beispielausgabe
  14. Typische stop_reason
  15. Was Hier NICHT Gezeigt Wird
  16. Was Du Als Nachstes Probieren Kannst

Kern des Musters (Kurz)

Code-Execution Agent bedeutet: Der Agent kann Code vorschlagen, aber die Ausfuhrung passiert nur uber eine Execution Boundary mit Policies und Limits.

Vor dem Code-Start fuhrt das System aus:

  • action contract validation
  • policy check (Sprache, Imports, gesperrte Aufrufe)
  • separate subprocess execution (best-effort, not a security sandbox) mit timeout/output limits
  • output validation gegen den Vertrag

Was Dieses Beispiel Zeigt

  • der Agent erstellt eine proposed_action mit Code und Payload
  • runtime ubernimmt allowed_languages/max_code_chars/timeout aus policy_hints (mit fallback defaults)
  • network_access=denied ist in dieser Demo eine contract-level Anforderung (echtes network enforcement braucht container/jail)
  • policy allowlist und execution allowlist sind getrennt (python/javascript vs tatsachlich python)
  • statische Codeprufung blockiert unsafe imports/calls und gefahrliche indirection-Primitiven (getattr) + alle dunder references (__*)
  • static policy blockiert nur offensichtliche URL literals (http://, https://) und ist kein network enforcement
  • DENIED_GLOBAL_NAMES gilt nur fur untrusted generated code; host-side Gateway-Code darf os/subprocess/sys importieren, um die Boundary auszufuhren
  • Code lauft in einer separaten subprocess boundary (best-effort, not a security sandbox) mit timeout
  • output caps werden in der Demo post-factum nach Prozessende gepruft
  • output wird per Schema validiert, bevor die Business-Antwort erstellt wird
  • trace/history machen den kompletten Zyklus plan -> policy -> execute -> finalize auditierbar

Architektur

  1. agent.py erzeugt eine Action mit Python-Code zur Berechnung von Incident-Metriken.
  2. gateway.py validiert den Action-Vertrag und policy-check.
  3. Bei policy pass: execution layer startet Code in einer separaten subprocess boundary (best-effort, not a security sandbox) mit Limits.
  4. Ergebnis wird als JSON geparst und gegen required schema gepruft.
  5. main.py sammelt aggregate, trace/history und liefert ein operations brief zuruck.

Was Du Beim Start Siehst

  1. step 1: code plan (code_hash, chars)
  2. step 2: policy decision (allow/deny + reason)
  3. step 3: execution metrics (exec_ms, stdout_bytes, stderr_bytes)
  4. step 4: validated metrics -> final brief

Projektstruktur

TEXT
agent-patterns/
└── code-execution-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/code-execution-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv.


Aufgabe

Production-Fall:

"Code sicher ausfuhren, um Health-Metriken eines Payment-Incidents zu berechnen, und ein Brief fur Operations zuruckgeben."


Code

context.py — request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, incident_id: str) -> dict[str, Any]:
    transactions: list[dict[str, Any]] = []
    for idx in range(60):
        is_failed = idx in {7, 34}
        transactions.append(
            {
                "transaction_id": f"txn_{idx + 1:03d}",
                "status": "failed" if is_failed else "paid",
                "chargeback": idx == 34,
                "latency_ms": 150 + (idx % 40) + (25 if is_failed else 0),
            }
        )

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "incident_id": incident_id,
            "transactions": transactions,
        },
        "policy_hints": {
            "allowed_languages": ["python", "javascript"],
            "max_code_chars": 2400,
            "exec_timeout_seconds": 2.0,
            "network_access": "denied",
        },
    }

agent.py — proposed action + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_code_execution_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    req = request["request"]
    del goal

    code = """
import json
import statistics

payload = json.loads(input())
rows = payload["transactions"]

total = len(rows)
failed = sum(1 for row in rows if row["status"] != "paid")
chargeback_alerts = sum(1 for row in rows if row.get("chargeback") is True)
failed_rate = (failed / total) if total else 0.0

latencies = [float(row["latency_ms"]) for row in rows]
avg_latency = statistics.fmean(latencies) if latencies else 0.0

if latencies:
    sorted_latencies = sorted(latencies)
    p95_idx = int(round((len(sorted_latencies) - 1) * 0.95))
    p95_latency = sorted_latencies[p95_idx]
else:
    p95_latency = 0.0

severity = "P1" if failed_rate >= 0.03 else "P2"
eta_minutes = 45 if severity == "P1" else 20

print(
    json.dumps(
        {
            "failed_payment_rate": failed_rate,
            "chargeback_alerts": chargeback_alerts,
            "incident_severity": severity,
            "eta_minutes": eta_minutes,
            "affected_checkout_share": failed_rate,
            "avg_latency_ms": avg_latency,
            "p95_latency_ms": p95_latency,
            "sample_size": total,
            "incident_id": payload["incident_id"],
            "region": payload["region"],
        },
        separators=(",", ":"),
    )
)
""".strip()

    return {
        "action": {
            "id": "c1",
            "language": "python",
            "entrypoint": "main.py",
            "code": code,
            "input_payload": {
                "incident_id": req["incident_id"],
                "region": req["region"],
                "transactions": req["transactions"],
            },
        }
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    execution_summary: dict[str, Any],
) -> str:
    req = request["request"]
    metrics = aggregate["metrics"]

    return (
        f"Code execution brief ({req['region']}, {req['report_date']}): incident {req['incident_id']} is "
        f"{metrics['incident_severity']} with failed payments at {metrics['failed_payment_rate_pct']}% and "
        f"{metrics['chargeback_alerts']} chargeback alerts. Affected checkout share is "
        f"{metrics['affected_checkout_share_pct']}%, average latency is {metrics['avg_latency_ms']} ms "
        f"(p95 {metrics['p95_latency_ms']} ms), and ETA is ~{metrics['eta_minutes']} minutes. "
        f"Executed in a separate subprocess boundary (best-effort, not a security sandbox) "
        f"({execution_summary['exec_ms']} ms, {execution_summary['stdout_bytes']} stdout bytes, "
        f"{execution_summary['stderr_bytes']} stderr bytes)."
    )

gateway.py — policy + execution + output validation

PYTHON
from __future__ import annotations

import ast
import hashlib
import json
import os
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_code_chars: int = 2400
    exec_timeout_seconds: float = 2.0
    max_stdout_bytes: int = 4096
    max_stderr_bytes: int = 4096


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


ALLOWED_IMPORTS = {"json", "statistics"}
# Blocked function calls in generated code.
DENIED_CALL_NAMES = {"exec", "eval", "compile", "__import__", "open"}
# NOTE: heuristic hardening — blocks suspicious attribute names regardless of receiver type.
DENIED_CALL_ATTRS = {"system", "popen", "fork", "spawn", "connect", "request", "urlopen"}
# Blocked global name references in generated code (introspection/indirection primitives).
DENIED_NAME_REFERENCES = {
    "builtins",
    "getattr",
    "setattr",
    "delattr",
    "globals",
    "locals",
    "vars",
    "dir",
}
# Blocked identifier references in generated code (belt-and-suspenders hardening).
DENIED_GLOBAL_NAMES = {"os", "sys", "subprocess", "socket", "pathlib", "importlib"}


def code_hash(code: str) -> str:
    return hashlib.sha256(code.encode("utf-8")).hexdigest()[:16]


def validate_code_action(raw: Any, *, max_code_chars: int) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_action:not_object")

    action_id = raw.get("id")
    language = raw.get("language")
    entrypoint = raw.get("entrypoint")
    code = raw.get("code")
    input_payload = raw.get("input_payload")

    if not isinstance(action_id, str) or not action_id.strip():
        raise StopRun("invalid_action:id")
    if not isinstance(language, str) or not language.strip():
        raise StopRun("invalid_action:language")
    if not isinstance(entrypoint, str) or not entrypoint.strip():
        raise StopRun("invalid_action:entrypoint")
    if not isinstance(code, str) or not code.strip():
        raise StopRun("invalid_action:code")
    if len(code) > max_code_chars:
        raise StopRun("invalid_action:code_too_long")
    if not isinstance(input_payload, dict):
        raise StopRun("invalid_action:input_payload")

    normalized_entrypoint = entrypoint.strip()
    if "/" in normalized_entrypoint or "\\" in normalized_entrypoint:
        raise StopRun("invalid_action:entrypoint_path")
    if normalized_entrypoint != "main.py":
        raise StopRun("invalid_action:entrypoint_denied")

    return {
        "id": action_id.strip(),
        "language": language.strip().lower(),
        "entrypoint": normalized_entrypoint,
        "code": code,
        "input_payload": input_payload,
    }


def validate_execution_output(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_code_output:not_object")

    required = {
        "failed_payment_rate",
        "chargeback_alerts",
        "incident_severity",
        "eta_minutes",
        "affected_checkout_share",
        "avg_latency_ms",
        "p95_latency_ms",
        "sample_size",
        "incident_id",
        "region",
    }
    if not required.issubset(set(raw.keys())):
        raise StopRun("invalid_code_output:missing_required")

    failed_rate = raw["failed_payment_rate"]
    if not isinstance(failed_rate, (int, float)) or not (0 <= float(failed_rate) <= 1):
        raise StopRun("invalid_code_output:failed_payment_rate")

    share = raw["affected_checkout_share"]
    if not isinstance(share, (int, float)) or not (0 <= float(share) <= 1):
        raise StopRun("invalid_code_output:affected_checkout_share")

    chargeback_alerts = raw["chargeback_alerts"]
    if not isinstance(chargeback_alerts, int) or chargeback_alerts < 0:
        raise StopRun("invalid_code_output:chargeback_alerts")

    severity = raw["incident_severity"]
    if severity not in {"P1", "P2", "P3"}:
        raise StopRun("invalid_code_output:incident_severity")

    eta = raw["eta_minutes"]
    if not isinstance(eta, int) or eta < 0 or eta > 240:
        raise StopRun("invalid_code_output:eta_minutes")

    sample_size = raw["sample_size"]
    if not isinstance(sample_size, int) or sample_size <= 0:
        raise StopRun("invalid_code_output:sample_size")

    try:
        avg_latency = round(float(raw["avg_latency_ms"]), 2)
    except (TypeError, ValueError):
        raise StopRun("invalid_code_output:avg_latency_ms")
    try:
        p95_latency = round(float(raw["p95_latency_ms"]), 2)
    except (TypeError, ValueError):
        raise StopRun("invalid_code_output:p95_latency_ms")

    return {
        "failed_payment_rate": float(failed_rate),
        "chargeback_alerts": chargeback_alerts,
        "incident_severity": severity,
        "eta_minutes": eta,
        "affected_checkout_share": float(share),
        "avg_latency_ms": avg_latency,
        "p95_latency_ms": p95_latency,
        "sample_size": sample_size,
        "incident_id": str(raw["incident_id"]),
        "region": str(raw["region"]).upper(),
    }


def _static_policy_violations(code: str) -> list[str]:
    lower = code.lower()
    violations: list[str] = []
    input_calls = 0

    if "http://" in lower or "https://" in lower:
        violations.append("network_literal_blocked")

    try:
        tree = ast.parse(code)
    except SyntaxError:
        return ["syntax_error"]

    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                module = alias.name.split(".")[0]
                if module not in ALLOWED_IMPORTS:
                    violations.append(f"import_not_allowed:{module}")
        elif isinstance(node, ast.ImportFrom):
            module = (node.module or "").split(".")[0]
            if module not in ALLOWED_IMPORTS:
                violations.append(f"import_not_allowed:{module or 'relative'}")
        elif isinstance(node, ast.Name):
            if node.id.startswith("__"):
                violations.append(f"name_not_allowed:{node.id}")
            elif node.id in DENIED_NAME_REFERENCES:
                violations.append(f"name_not_allowed:{node.id}")
            elif node.id in DENIED_GLOBAL_NAMES:
                violations.append(f"name_not_allowed:{node.id}")
        elif isinstance(node, ast.Attribute):
            if node.attr.startswith("__"):
                violations.append(f"attr_not_allowed:{node.attr}")
            elif node.attr in DENIED_CALL_ATTRS:
                violations.append(f"attr_not_allowed:{node.attr}")
        elif isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name) and node.func.id in DENIED_CALL_NAMES:
                violations.append(f"call_not_allowed:{node.func.id}")
            elif isinstance(node.func, ast.Name) and node.func.id in {"getattr", "setattr", "delattr"}:
                violations.append(f"call_not_allowed:{node.func.id}")
            elif isinstance(node.func, ast.Name) and node.func.id == "input":
                input_calls += 1
            elif isinstance(node.func, ast.Attribute) and node.func.attr in DENIED_CALL_ATTRS:
                violations.append(f"call_not_allowed:{node.func.attr}")

    if input_calls > 1:
        violations.append("too_many_input_reads")

    return sorted(set(violations))


class CodeExecutionGateway:
    def __init__(
        self,
        *,
        allowed_languages_policy: set[str],
        allowed_languages_execution: set[str],
        budget: Budget,
    ):
        self.allowed_languages_policy = {item.lower() for item in allowed_languages_policy}
        self.allowed_languages_execution = {item.lower() for item in allowed_languages_execution}
        self.budget = budget

    def evaluate(self, *, action: dict[str, Any]) -> Decision:
        language = action["language"]
        if language not in self.allowed_languages_policy:
            return Decision(kind="deny", reason="language_denied_policy")
        if language not in self.allowed_languages_execution:
            return Decision(kind="deny", reason="language_denied_execution")

        violations = _static_policy_violations(action["code"])
        if violations:
            return Decision(kind="deny", reason=f"static_policy_violation:{','.join(violations[:3])}")
        return Decision(kind="allow", reason="policy_pass")

    def execute_python(self, *, code: str, entrypoint: str, input_payload: dict[str, Any]) -> dict[str, Any]:
        if entrypoint != "main.py":
            raise StopRun("invalid_action:entrypoint_denied")

        with tempfile.TemporaryDirectory(prefix="code_exec_agent_") as temp_dir:
            script_path = Path(temp_dir) / entrypoint
            script_path.write_text(code, encoding="utf-8")

            # Minimal hardening for interpreter behavior in this demo boundary.
            proc_env = os.environ.copy()
            proc_env["PYTHONNOUSERSITE"] = "1"
            proc_env["PYTHONDONTWRITEBYTECODE"] = "1"

            started = time.monotonic()
            try:
                proc = subprocess.run(
                    [sys.executable, str(script_path)],
                    input=json.dumps(input_payload),
                    text=True,
                    encoding="utf-8",
                    errors="replace",
                    capture_output=True,
                    timeout=self.budget.exec_timeout_seconds,
                    cwd=temp_dir,
                    env=proc_env,
                )
            except subprocess.TimeoutExpired as exc:
                raise StopRun("code_timeout") from exc

            exec_ms = int((time.monotonic() - started) * 1000)
            stdout = proc.stdout or ""
            stderr = proc.stderr or ""
            # Demo limitation: size caps are checked after process completion.
            stdout_bytes = len(stdout.encode("utf-8"))
            stderr_bytes = len(stderr.encode("utf-8"))
            if stdout_bytes > self.budget.max_stdout_bytes:
                raise StopRun("code_output_too_large")
            if stderr_bytes > self.budget.max_stderr_bytes:
                raise StopRun("code_stderr_too_large")
            if proc.returncode != 0:
                stderr_snippet = stderr.strip().replace("\n", " ")[:200]
                stdout_snippet = stdout.strip().replace("\n", " ")[:200]
                details: dict[str, str] = {}
                if stderr_snippet:
                    details["stderr_snippet"] = stderr_snippet
                if stdout_snippet:
                    details["stdout_snippet"] = stdout_snippet
                raise StopRun(
                    f"code_runtime_error:{proc.returncode}",
                    details=details,
                )

            stdout = stdout.strip()
            if not stdout:
                raise StopRun("invalid_code_output:empty_stdout")

            try:
                payload = json.loads(stdout)
            except json.JSONDecodeError as exc:
                raise StopRun("invalid_code_output:non_json") from exc

            if not isinstance(payload, dict):
                raise StopRun("invalid_code_output:not_object")

            return {
                "payload": payload,
                "exec_ms": exec_ms,
                "stdout_bytes": stdout_bytes,
                "stderr_bytes": stderr_bytes,
            }

main.py — orchestrate code-execution flow

PYTHON
from __future__ import annotations

import json
import math
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_code_execution_plan
from context import build_request
from gateway import (
    Budget,
    CodeExecutionGateway,
    StopRun,
    code_hash,
    validate_code_action,
    validate_execution_output,
)

GOAL = (
    "Run a safe code task to compute incident metrics from payment transactions "
    "and return an operations-ready summary."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    incident_id="inc_payments_20260307",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_code_chars=2400,
    exec_timeout_seconds=2.0,
    max_stdout_bytes=4096,
    max_stderr_bytes=4096,
)

DEFAULT_ALLOWED_LANGUAGES_POLICY = {"python", "javascript"}
ALLOWED_LANGUAGES_EXECUTION = {"python"}


def run_code_execution_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}
    network_access = str(hints.get("network_access", "denied")).strip().lower()
    if network_access not in {"denied", "none", "off"}:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:network_access_must_be_denied",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    allowed_policy_raw = hints.get("allowed_languages")
    if isinstance(allowed_policy_raw, list):
        allowed_policy = {
            str(item).strip().lower()
            for item in allowed_policy_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_policy = set(DEFAULT_ALLOWED_LANGUAGES_POLICY)
    if not allowed_policy:
        allowed_policy = set(DEFAULT_ALLOWED_LANGUAGES_POLICY)

    max_code_chars_raw = hints.get("max_code_chars", DEFAULT_BUDGET.max_code_chars)
    exec_timeout_raw = hints.get("exec_timeout_seconds", DEFAULT_BUDGET.exec_timeout_seconds)
    try:
        max_code_chars = int(max_code_chars_raw)
    except (TypeError, ValueError):
        max_code_chars = DEFAULT_BUDGET.max_code_chars
    try:
        exec_timeout_seconds = float(exec_timeout_raw)
        if not math.isfinite(exec_timeout_seconds):
            raise ValueError
    except (TypeError, ValueError):
        exec_timeout_seconds = DEFAULT_BUDGET.exec_timeout_seconds

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_code_chars=max(200, min(8000, max_code_chars)),
        exec_timeout_seconds=max(0.2, min(20.0, exec_timeout_seconds)),
        max_stdout_bytes=DEFAULT_BUDGET.max_stdout_bytes,
        max_stderr_bytes=DEFAULT_BUDGET.max_stderr_bytes,
    )

    gateway = CodeExecutionGateway(
        allowed_languages_policy=allowed_policy,
        allowed_languages_execution=ALLOWED_LANGUAGES_EXECUTION,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_code_execution_plan(goal=goal, request=request)
        action = validate_code_action(raw_plan.get("action"), max_code_chars=budget.max_code_chars)
        generated_code_hash = code_hash(action["code"])

        trace.append(
            {
                "step": 1,
                "phase": "plan_code",
                "action_id": action["id"],
                "language": action["language"],
                "code_hash": generated_code_hash,
                "chars": len(action["code"]),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_code_execution_plan",
                "proposed_action": {
                    "id": action["id"],
                    "language": action["language"],
                    "entrypoint": action["entrypoint"],
                    "code_hash": generated_code_hash,
                },
            }
        )

        phase = "policy_check"
        decision = gateway.evaluate(action=action)
        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_languages_policy": sorted(allowed_policy),
                "allowed_languages_execution": sorted(ALLOWED_LANGUAGES_EXECUTION),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {
                    "kind": decision.kind,
                    "reason": decision.reason,
                },
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="execute")

        phase = "execute"
        execute_trace = {
            "step": 3,
            "phase": "execute_code",
            "language": action["language"],
            "code_hash": generated_code_hash,
            "ok": False,
        }
        trace.append(execute_trace)
        try:
            execution = gateway.execute_python(
                code=action["code"],
                entrypoint=action["entrypoint"],
                input_payload=action["input_payload"],
            )
            validated = validate_execution_output(execution["payload"])
        except StopRun as exc:
            execute_trace["error"] = exc.reason
            details = exc.details if isinstance(exc.details, dict) else {}
            stderr_snippet = str(details.get("stderr_snippet", "")).strip()
            stdout_snippet = str(details.get("stdout_snippet", "")).strip()
            if stderr_snippet:
                execute_trace["stderr_snippet"] = stderr_snippet
            if stdout_snippet:
                execute_trace["stdout_snippet"] = stdout_snippet
            history.append(
                {
                    "step": 3,
                    "action": "execute_code",
                    "status": "error",
                    "reason": exc.reason,
                    **({"stderr_snippet": stderr_snippet} if stderr_snippet else {}),
                    **({"stdout_snippet": stdout_snippet} if stdout_snippet else {}),
                }
            )
            raise

        execute_trace["stdout_bytes"] = execution["stdout_bytes"]
        execute_trace["stderr_bytes"] = execution["stderr_bytes"]
        execute_trace["exec_ms"] = execution["exec_ms"]
        execute_trace["ok"] = True
        history.append(
            {
                "step": 3,
                "action": "execute_code",
                "result": validated,
            }
        )

        aggregate = {
            "report_date": request["request"]["report_date"],
            "region": request["request"]["region"],
            "incident_id": request["request"]["incident_id"],
            "metrics": {
                "incident_severity": validated["incident_severity"],
                "failed_payment_rate": round(validated["failed_payment_rate"], 6),
                "failed_payment_rate_pct": round(validated["failed_payment_rate"] * 100, 2),
                "chargeback_alerts": validated["chargeback_alerts"],
                "eta_minutes": validated["eta_minutes"],
                "affected_checkout_share": round(validated["affected_checkout_share"], 6),
                "affected_checkout_share_pct": round(validated["affected_checkout_share"] * 100, 2),
                "avg_latency_ms": validated["avg_latency_ms"],
                "p95_latency_ms": validated["p95_latency_ms"],
                "sample_size": validated["sample_size"],
            },
        }
        execution_summary = {
            "language": action["language"],
            "code_hash": generated_code_hash,
            "exec_ms": execution["exec_ms"],
            "stdout_bytes": execution["stdout_bytes"],
            "stderr_bytes": execution["stderr_bytes"],
        }
        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            execution_summary=execution_summary,
        )

        trace.append(
            {
                "step": 4,
                "phase": "finalize",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "code_execution_success",
            "answer": answer,
            "proposed_action": {
                "id": action["id"],
                "language": action["language"],
                "entrypoint": action["entrypoint"],
                "code_hash": generated_code_hash,
            },
            "aggregate": aggregate,
            "execution": execution_summary,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(exc.reason, phase=phase)


def main() -> None:
    result = run_code_execution_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklart)

  • der Agent kann Code vorschlagen, aber die execution boundary entscheidet, ob er uberhaupt laufen darf
  • policy check blockiert unsicheren Code vor der Ausfuhrung
  • policy blockiert auch alle dunder references (__*) als Demo-Hardening
  • Ausfuhrung lauft in einer separaten subprocess boundary (best-effort, not a security sandbox) mit timeout und output limit
  • output wird nicht "wie er ist" vertraut: er geht durch schema validation
  • die Business-Antwort wird nur aus validated output erstellt

Beispielausgabe

JSON
{
  "run_id": "5506a01d-01a1-47e8-88c9-24eb0d1dff39",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "code_execution_success",
  "answer": "Code execution brief (US, 2026-03-07): incident inc_payments_20260307 is P1 with failed payments at 3.33% and 1 chargeback alerts. Affected checkout share is 3.33%, average latency is 167.0 ms (p95 187.0 ms), and ETA is ~45 minutes. Executed in a separate subprocess boundary (best-effort, not a security sandbox) (25 ms, 269 stdout bytes, 0 stderr bytes).",
  "proposed_action": {
    "id": "c1",
    "language": "python",
    "entrypoint": "main.py",
    "code_hash": "47a5c01dd74664c7"
  },
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "incident_id": "inc_payments_20260307",
    "metrics": {
      "incident_severity": "P1",
      "failed_payment_rate": 0.033333,
      "failed_payment_rate_pct": 3.33,
      "chargeback_alerts": 1,
      "eta_minutes": 45,
      "affected_checkout_share": 0.033333,
      "affected_checkout_share_pct": 3.33,
      "avg_latency_ms": 167.0,
      "p95_latency_ms": 187.0,
      "sample_size": 60
    }
  },
  "execution": {
    "language": "python",
    "code_hash": "47a5c01dd74664c7",
    "exec_ms": 25,
    "stdout_bytes": 269,
    "stderr_bytes": 0
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan_code",
      "action_id": "c1",
      "language": "python",
      "code_hash": "47a5c01dd74664c7",
      "chars": 1229,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_languages_policy": ["javascript", "python"],
      "allowed_languages_execution": ["python"],
      "ok": true
    },
    {
      "step": 3,
      "phase": "execute_code",
      "language": "python",
      "code_hash": "47a5c01dd74664c7",
      "ok": true,
      "stdout_bytes": 269,
      "stderr_bytes": 0,
      "exec_ms": 25
    },
    {
      "step": 4,
      "phase": "finalize",
      "ok": true
    }
  ],
  "history": [{...}]
}

Typische stop_reason

  • success - Run korrekt abgeschlossen
  • max_seconds - gesamtes Time-Budget aufgebraucht
  • invalid_action:* - ungueltiger Action-Vertrag
  • invalid_action:entrypoint_path - entrypoint enthalt / oder \ (Path-Traversal-Versuch)
  • invalid_action:entrypoint_denied - entrypoint nicht in allowlist (nur main.py ist erlaubt)
  • policy_block:language_denied_policy - Sprache durch policy allowlist gesperrt
  • policy_block:language_denied_execution - Sprache durch execution allowlist gesperrt
  • policy_block:static_policy_violation:* - statische Codeprufung hat gesperrte Operation gefunden
  • code_timeout - Codeausfuhrung hat timeout uberschritten
  • code_runtime_error:* - Code endete mit Runtime-Fehler
  • code_output_too_large - stdout uberschritt allowed output budget
  • code_stderr_too_large - stderr uberschritt allowed output budget
  • invalid_code_output:* - Code-Ergebnis hat schema validation nicht bestanden
  • invalid_request:network_access_must_be_denied - dieses Beispiel akzeptiert nur network_access=denied

Was Hier NICHT Gezeigt Wird

  • container-level Isolation (seccomp/cgroup/jail)
  • filesystem / cpu / memory isolation (dieses Beispiel ist kein security sandbox)
  • echte Netzisolation zur Laufzeit (hier ist network_access=denied nur ein contract-level check)
  • streaming caps mit kill-on-limit fur stdout/stderr (in der Demo wird cap post-factum nach Prozessende gepruft)
  • artifact storage fur generated code/result snapshots
  • multi-step repair loop (erneute Codegenerierung nach Fehler)
  • human approval fur riskante Execution-Plane
  • full DoS mitigation (file-spam / heavy allocations / algorithmic bombs)

Was Du Als Nachstes Probieren Kannst

  1. language auf javascript setzen und policy_block:language_denied_execution ansehen.
  2. import os in den Code einfugen und policy_block:static_policy_violation:import_not_allowed:os ansehen.
  3. while True: pass in den Code einfugen und code_timeout ansehen.
  4. "sample_size": 0 im Skript-Ergebnis zuruckgeben und invalid_code_output:sample_size ansehen.
  5. "incident_severity": "P0" im Skript-Ergebnis zuruckgeben und invalid_code_output:incident_severity ansehen.
⏱️ 15 Min. LesezeitAktualisiert 6. März 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.

Autor

Nick — Engineer, der Infrastruktur für KI-Agenten in Produktion aufbaut.

Fokus: Agent-Patterns, Failure-Modes, Runtime-Steuerung und Systemzuverlässigkeit.

🔗 GitHub: https://github.com/mykolademyanov


Redaktioneller Hinweis

Diese Dokumentation ist KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Der Inhalt basiert auf realen Ausfällen, Post-Mortems und operativen Vorfällen in produktiv eingesetzten KI-Agenten-Systemen.