Self-Critique Agent in Python: Vollständiges Beispiel

Ausführbares Self-Critique-Agent-Beispiel in Python im Production-Stil mit Draft → Critique → Revise → Audit, Policy/Execution Boundary und expliziten Stop-Reasons.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Lösung
  8. Code
  9. context.py — deterministischer Kontext
  10. gateway.py — critique/revision policy boundary
  11. llm.py — Draft/Critique/Revise-Aufrufe
  12. audit.py — Änderungsprotokoll
  13. main.py — Orchestrierung des vollständigen Zyklus
  14. Beispielausgabe
  15. Typische stop_reason-Werte
  16. Was hier NICHT gezeigt wird
  17. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

Self-Critique Agent ist ein Muster, bei dem der Agent nach einem Entwurf eine strukturierte Kritik (risks + required_changes) erstellt, eine begrenzte Revision ausführt und ein Audit der Änderungen protokolliert.

Das LLM schlägt Entwurf und Kritik vor, und die Gateway-Policy entscheidet, ob das ausführbar ist und ob die Revision innerhalb der Vertragsgrenzen bleibt.


Was dieses Beispiel zeigt

  • production-like flow: Draft -> Critique -> Revise (optional) -> Audit
  • streng strukturiertes Critique-Artefakt: decision, severity, risks, required_changes
  • Trennung von Policy vs Execution für Critique-Entscheidungen
  • constrained revision: no_new_facts, length increase cap, required changes enforcement
  • Änderungs-Audit (before/after hash, delta_chars, diff_excerpt)
  • explizite stop_reason, trace, history für Production-Monitoring

Architektur

  1. LLM erzeugt einen Entwurf (draft).
  2. Der LLM-Kritiker gibt strukturiertes Critique-JSON zurück.
  3. Gateway validiert den Critique-Vertrag gemäß Policy.
  4. Die Execution-Layer enforced die Runtime-Allowlist für Entscheidungen.
  5. Bei revise wird genau eine Revision mit Guardrails ausgeführt.
  6. Audit-Log der Änderungen und finale Antwort werden erzeugt.

Schlüsselvertrag: Der Agent darf den Text nicht "frei umschreiben"; Änderungen werden über required_changes und Runtime-Guardrails kontrolliert.


Projektstruktur

TEXT
agent-patterns/
└── self-critique-agent/
    └── python/
        ├── main.py          # Draft -> Critique -> Revise -> Audit
        ├── llm.py           # draft/critique/revise LLM calls
        ├── gateway.py       # contract + guardrails + policy/execution checks
        ├── context.py       # deterministic incident context
        ├── audit.py         # diff metadata for audit log
        ├── requirements.txt
        └── README.md

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv.


Aufgabe

Stell dir einen Production-Fall vor:

"Erstelle ein customer-facing Update zu einem Payment-Incident, aber ändere keine Fakten und gib keine ETA-Garantien."

Problem einer normalen Revision: Das Modell kann den Text "verbessern", dabei aber versehentlich die Bedeutung ändern.

Lösung

In diesem Beispiel:

  • die Kritik ist formalisiert: was falsch ist und was korrigiert werden muss
  • die Revision hat harte Grenzen
  • das Audit zeigt, was tatsächlich geändert wurde
  • High-Risk-Fälle werden über policy_escalation gestoppt

Code

context.py — deterministischer Kontext

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260306",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "max_length_increase_pct": 20,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page every 15 minutes",
            "prepare support macro with workaround guidance",
        ],
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Alle folgenden Schritte basieren auf festen Fakten.
  • Das reduziert "Drift" zwischen Runs und vereinfacht das Audit.

gateway.py — critique/revision policy boundary

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    max_draft_chars: int = 900
    max_risks: int = 5
    max_required_changes: int = 5
    max_answer_chars: int = 980
    max_length_increase_pct: float = 20.0
    min_patch_similarity: float = 0.4


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]

ALLOWED_SEVERITY = {"low", "medium", "high"}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))


def _extract_incident_ids(text: str) -> set[str]:
    return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key in sorted(value):
            item = value[key]
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in required_changes:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        # Keep extraction deterministic and explicit:
        # - REMOVE/MUST_REMOVE => must_remove
        # - ADD/MUST_INCLUDE   => must_include
        # - anything else      => ignored (must be blocked in critique validation)
        is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
            ("remove ", "remove:", "remove-")
        )
        is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
            ("add ", "add:", "add-")
        )

        if is_remove_rule:
            for phrase in quoted:
                _append_unique(must_remove, phrase)
            continue
        if is_add_rule:
            for phrase in quoted:
                _append_unique(must_include, phrase)
            continue

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _is_high_risk_risk_type(risk_type: str) -> bool:
    return risk_type in {"legal_risk", "policy_violation"}


def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
    # Compare using token-like normalization so punctuation differences
    # (e.g. trailing dots/commas) do not cause false negatives.
    normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
    normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
    normalized_text = " ".join(normalized_text.split())
    normalized_phrase = " ".join(normalized_phrase.split())
    return normalized_phrase in normalized_text


def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
    cleaned = text
    normalized_phrase = _normalize_space(phrase).strip()
    if not normalized_phrase:
        return cleaned

    variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
    for variant in variants:
        if not variant:
            continue
        cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)

    cleaned = re.sub(r"\s+\.", ".", cleaned)
    cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


def _append_phrase_sentence(*, text: str, phrase: str) -> str:
    sentence = _normalize_space(phrase).strip()
    if not sentence:
        return text

    out = text.rstrip()
    if out and out[-1] not in ".!?":
        out += "."
    separator = "\n\n" if "\n\n" in out else " "
    return (out + separator + sentence).strip()


def _is_enforceable_required_change(item: str) -> bool:
    item_norm = _normalize_space(item).lower()
    has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
        ("remove ", "remove:", "remove-", "add ", "add:", "add-")
    )
    if not has_marker:
        return False

    quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
    quoted = [value for value in quoted if value]
    return len(quoted) == 1


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    out = draft.strip()
    if len(out) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return out


def validate_critique(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_risk_types_policy: set[str],
    max_risks: int,
    max_required_changes: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_critique:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_critique:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"critique_decision_not_allowed_policy:{decision}")

    severity = raw.get("severity", "medium")
    if not isinstance(severity, str) or not severity.strip():
        raise StopRun("invalid_critique:severity")
    severity = severity.strip().lower()
    if severity not in ALLOWED_SEVERITY:
        raise StopRun("invalid_critique:severity")

    risks_raw = raw.get("risks", [])
    if not isinstance(risks_raw, list):
        raise StopRun("invalid_critique:risks")
    if len(risks_raw) > max_risks:
        raise StopRun("invalid_critique:too_many_risks")

    risks: list[dict[str, str]] = []
    for item in risks_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_critique:risk_item")

        risk_type = item.get("type")
        note = item.get("note")

        if not isinstance(risk_type, str) or not risk_type.strip():
            raise StopRun("invalid_critique:risk_type")
        risk_type = risk_type.strip()
        if risk_type not in allowed_risk_types_policy:
            raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_critique:risk_note")

        risks.append({"type": risk_type, "note": note.strip()})

    required_changes_raw = raw.get("required_changes", [])
    if not isinstance(required_changes_raw, list):
        raise StopRun("invalid_critique:required_changes")
    if len(required_changes_raw) > max_required_changes:
        raise StopRun("invalid_critique:too_many_required_changes")

    required_changes: list[str] = []
    for item in required_changes_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_critique:required_change_item")
        required_changes.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_critique:reason")
    reason = reason.strip()

    high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)

    if decision == "approve":
        if required_changes:
            raise StopRun("invalid_critique:approve_with_required_changes")
        if high_risk:
            raise StopRun("invalid_critique:approve_with_high_risk")

    if decision == "revise":
        if not required_changes:
            raise StopRun("invalid_critique:revise_without_required_changes")
        if not all(_is_enforceable_required_change(item) for item in required_changes):
            raise StopRun("invalid_critique:required_changes_not_enforceable")
        if high_risk:
            raise StopRun("invalid_critique:high_risk_requires_escalate")

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_critique:escalate_reason_required")

    return {
        "decision": decision,
        "severity": severity,
        "risks": risks,
        "required_changes": required_changes,
        "reason": reason,
        "high_risk": high_risk,
    }


class SelfCritiqueGateway:
    def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"critique_decision_denied_execution:{decision}")

    def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
        """
        Deterministic fallback for enforceable required changes:
        remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
        """
        candidate = (text or "").strip()
        if not candidate:
            return candidate

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        for phrase in must_remove:
            candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)

        for phrase in must_include:
            if not _contains_normalized_phrase(text=candidate, phrase=phrase):
                candidate = _append_phrase_sentence(text=candidate, phrase=phrase)

        return candidate.strip()

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        required_changes: list[str],
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        original_len = max(1, len(normalized_original))
        revised_len = len(normalized_revised)
        increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
        policy_hint_raw = (
            context.get("policy_hints", {}).get("max_length_increase_pct")
            if isinstance(context, dict)
            else None
        )
        policy_hint_cap = self.budget.max_length_increase_pct
        if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
            policy_hint_cap = float(policy_hint_raw)

        effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
        if increase_pct > effective_length_cap:
            raise StopRun("patch_violation:length_increase_limit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)

        if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
            raise StopRun("patch_violation:no_new_facts")

        if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
            raise StopRun("patch_violation:new_incident_id")

        if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
            raise StopRun("patch_violation:new_severity_label")

        if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
            raise StopRun("patch_violation:new_region")

        avoid_absolute_guarantees = bool(
            context.get("policy_hints", {}).get("avoid_absolute_guarantees")
            if isinstance(context, dict)
            else False
        )
        for claim_re in RESTRICTED_CLAIMS_RE:
            if avoid_absolute_guarantees:
                if claim_re.search(revised_clean):
                    raise StopRun("patch_violation:restricted_claims")
                continue
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        if must_include or must_remove:
            revised_lower = normalized_revised.lower()
            if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")
            if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "length_increase_pct": round(increase_pct, 2),
            "required_changes_total": len(required_changes),
            "required_changes_enforced": len(must_include) + len(must_remove),
            "required_changes_unenforced": len(required_changes)
            - (len(must_include) + len(must_remove)),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        out = answer.strip()
        if len(out) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return out

Was hier am wichtigsten ist (einfach erklärt)

  • Critique ist kein "beliebiger Text": sie durchläuft strenge Validierung.
  • Revision darf den faktischen Kontext und Budget-Grenzen nicht überschreiten.
  • Für enforceable required_changes werden explizite Marker ADD/REMOVE (oder MUST_INCLUDE/MUST_REMOVE) genutzt.
  • Wenn avoid_absolute_guarantees=True, werden restricted claims unabhängig vom Entwurf blockiert (auch wenn sie bereits im ursprünglichen Draft standen).
  • required_changes_total zeigt die Gesamtzahl von required_changes.
  • required_changes_enforced zeigt, wie viele required_changes zu enforce-Regeln wurden.
  • required_changes_unenforced zeigt, wie viele required_changes vom Gateway nicht prüfbar sind.

llm.py — Draft/Critique/Revise-Aufrufe

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "customer-facing incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "draft": "shortened customer-facing incident update"
}

Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "severity": "low|medium|high",
  "risks": [{"type":"overconfidence","note":"..."}],
  "required_changes": ["concrete change"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
  - REMOVE "phrase to delete"
  - ADD "phrase to include"
  - MUST_REMOVE "phrase to delete" (equivalent)
  - MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def shorten_draft(*, draft: str, max_chars: int) -> str:
    payload = {
        "draft": draft,
        "max_chars": int(max_chars),
    }
    data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)

    shortened = data.get("draft")
    if not isinstance(shortened, str):
        raise LLMInvalid("llm_invalid_schema")

    shortened = shortened.strip()
    if not shortened:
        raise LLMEmpty("llm_empty")
    return shortened


def critique_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_risk_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_risk_types": allowed_risk_types,
    }
    return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    required_changes: list[str],
    strict_mode: bool = False,
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "required_changes": required_changes,
    }
    system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
    data = _chat_json(system_prompt=system_prompt, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Was hier am wichtigsten ist (einfach erklärt)

  • Der Critique-Output hat einen stabilen JSON-Vertrag.
  • Der Critique-Prompt verlangt enforceable required_changes explizit über ADD/REMOVE-Marker.

audit.py — Änderungsprotokoll

PYTHON
from __future__ import annotations

import difflib
import hashlib
import re
from typing import Any


SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")


def _hash_text(text: str) -> str:
    normalized = " ".join((text or "").split())
    return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]


def _split_for_diff(text: str) -> list[str]:
    lines = (text or "").splitlines()
    if len(lines) > 1:
        return lines

    normalized = (text or "").strip()
    if not normalized:
        return [""]

    sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
    if len(sentences) > 1:
        return sentences

    chunk_size = 80
    return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]


def build_audit_log(
    *,
    before: str,
    after: str,
    risks: list[dict[str, Any]],
    required_changes: list[str],
) -> dict[str, Any]:
    before_text = (before or "").strip()
    after_text = (after or "").strip()

    before_chars = len(before_text)
    after_chars = len(after_text)
    delta_chars = after_chars - before_chars

    if before_chars <= 0:
        increase_pct = 0.0
    else:
        increase_pct = (delta_chars / float(before_chars)) * 100.0

    raw_diff = list(
        difflib.unified_diff(
            _split_for_diff(before_text),
            _split_for_diff(after_text),
            fromfile="before",
            tofile="after",
            lineterm="",
        )
    )

    diff_excerpt: list[str] = []
    for line in raw_diff:
        if line.startswith(("---", "+++", "@@")):
            continue
        if line.startswith(("+", "-")):
            diff_excerpt.append(line)
        if len(diff_excerpt) >= 6:
            break

    return {
        "changed": before_text != after_text,
        "before_hash": _hash_text(before_text),
        "after_hash": _hash_text(after_text),
        "before_chars": before_chars,
        "after_chars": after_chars,
        "delta_chars": delta_chars,
        "length_increase_pct": round(increase_pct, 2),
        "risks_count": len(risks),
        "required_changes_count": len(required_changes),
        "diff_excerpt": diff_excerpt,
    }

Was hier am wichtigsten ist (einfach erklärt)

  • Nach der Revision wird Audit-Metadaten gespeichert, nicht nur der finale Text.
  • Das ermöglicht Debugging von "warum diese Version die Policy bestanden hat".

main.py — Orchestrierung des vollständigen Zyklus

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    max_draft_chars=900,
    max_risks=5,
    max_required_changes=5,
    max_answer_chars=980,
    max_length_increase_pct=20.0,
    min_patch_similarity=0.4,
)

ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
    ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_RISK_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = SelfCritiqueGateway(
        allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    draft_attempts = 0
    draft_retried = False
    try:
        draft_attempts += 1
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        try:
            draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
        except StopRun as exc:
            if exc.reason != "invalid_draft:too_long":
                raise
            # One bounded recovery attempt: shorten draft within policy budget.
            draft_attempts += 1
            draft_retried = True
            shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
            draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "attempts_used": draft_attempts,
            "retried": draft_retried,
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="critique")

    try:
        raw_critique = critique_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="critique")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="critique")

    try:
        critique = validate_critique(
            raw_critique,
            allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
            allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
            max_risks=BUDGET.max_risks,
            max_required_changes=BUDGET.max_required_changes,
        )
        gateway.enforce_execution_decision(critique["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="critique", raw_critique=raw_critique)

    trace.append(
        {
            "step": 2,
            "phase": "critique",
            "decision": critique["decision"],
            "severity": critique["severity"],
            "risks": len(critique["risks"]),
            "required_changes": len(critique["required_changes"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "critique_once",
            "critique": critique,
        }
    )

    if critique["decision"] == "escalate":
        escalation_reason = str(critique.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "critique",
            "critique": critique,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if critique["decision"] == "revise":
        revise_attempts = 0
        revise_retried = False
        revised_payload: dict[str, Any] | None = None
        last_revised_candidate = draft
        for attempt in range(1, 4):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="revise")

            revise_attempts = attempt
            strict_mode = attempt > 1
            try:
                revised_raw = revise_once(
                    goal=goal,
                    incident_context=incident_context,
                    draft=draft,
                    required_changes=critique["required_changes"],
                    strict_mode=strict_mode,
                )
                last_revised_candidate = revised_raw
                revised_payload = gateway.validate_revision(
                    original=draft,
                    revised=revised_raw,
                    context=incident_context,
                    required_changes=critique["required_changes"],
                )
                break
            except LLMTimeout:
                return stopped("llm_timeout", phase="revise")
            except LLMInvalid as exc:
                return stopped(exc.args[0], phase="revise")
            except LLMEmpty:
                return stopped("llm_empty", phase="revise")
            except StopRun as exc:
                if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
                    revise_retried = True
                    continue
                if exc.reason == "patch_violation:required_changes_not_applied":
                    # Final fallback: enforce required phrase edits deterministically.
                    try:
                        fallback_revised = gateway.apply_required_changes_fallback(
                            text=last_revised_candidate,
                            required_changes=critique["required_changes"],
                        )
                        revised_payload = gateway.validate_revision(
                            original=draft,
                            revised=fallback_revised,
                            context=incident_context,
                            required_changes=critique["required_changes"],
                        )
                        revise_attempts = attempt + 1
                        revise_retried = True
                        break
                    except StopRun as fallback_exc:
                        return stopped(fallback_exc.reason, phase="revise")
                return stopped(exc.reason, phase="revise")

        if revised_payload is None:
            return stopped("patch_violation:required_changes_not_applied", phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "length_increase_pct": revised_payload["length_increase_pct"],
                "required_changes_total": revised_payload["required_changes_total"],
                "required_changes_enforced": revised_payload["required_changes_enforced"],
                "required_changes_unenforced": revised_payload["required_changes_unenforced"],
                "attempts_used": revise_attempts,
                "retried": revise_retried,
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "required_changes": critique["required_changes"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    audit_log = build_audit_log(
        before=draft,
        after=final_answer,
        risks=critique["risks"],
        required_changes=critique["required_changes"],
    )

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "audit_finalize",
            "final_hash": text_hash(final_answer),
            "changed": audit_log["changed"],
            "diff_lines": len(audit_log["diff_excerpt"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "audit_finalize",
            "status": "final",
            "changed": audit_log["changed"],
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "critique_decision": critique["decision"],
        "severity": critique["severity"],
        "risks": critique["risks"],
        "required_changes": critique["required_changes"],
        "audit": audit_log,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • critique_decision steuert den weiteren Flow, aber Runtime kontrolliert die Execution-Rechte.
  • Kritik und Revision bleiben über trace + history + audit transparent.

Beispielausgabe

JSON
{
  "run_id": "721e4231-7b9a-4843-99e0-888616025b35",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
  "critique_decision": "revise",
  "severity": "medium",
  "risks": [
    {
      "type": "overconfidence",
      "note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
    }
  ],
  "required_changes": [
    "MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
    "MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
    "MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
    "MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
  ],
  "audit": {
    "changed": true,
    "before_hash": "7cdaa40c3fa4",
    "after_hash": "33b356380537",
    "before_chars": 770,
    "after_chars": 827,
    "delta_chars": 57,
    "length_increase_pct": 7.4,
    "risks_count": 1,
    "required_changes_count": 4,
    "diff_excerpt": [
      "-...",
      "+..."
    ]
  },
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "9b944d7375bf",
      "chars": 770,
      "attempts_used": 2,
      "retried": true,
      "ok": true
    },
    {
      "step": 2,
      "phase": "critique",
      "decision": "revise",
      "severity": "medium",
      "risks": 1,
      "required_changes": 4,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.849,
      "length_increase_pct": 7.42,
      "required_changes_total": 4,
      "required_changes_enforced": 4,
      "required_changes_unenforced": 0,
      "attempts_used": 2,
      "retried": true,
      "revised_hash": "d92d90a2adbe",
      "ok": true
    },
    {
      "step": 4,
      "phase": "audit_finalize",
      "final_hash": "d92d90a2adbe",
      "changed": true,
      "diff_lines": 4,
      "ok": true
    }
  ],
  "history": [{...}]
}

Typische stop_reason-Werte

  • success — Run wurde korrekt beendet
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_empty — leere LLM-Antwort in Draft/Revise
  • llm_invalid_json — LLM gab ungültiges JSON zurück
  • llm_invalid_schema — JSON entspricht nicht dem Vertrag
  • invalid_draft:* — Entwurf hat die Basisvalidierung nicht bestanden
  • invalid_critique:* — Critique hat den Policy-Layer-Vertrag nicht bestanden
  • invalid_critique:required_changes_not_enforceable — für decision=revise müssen required_changes im enforceable-Format vorliegen (ADD/REMOVE/MUST_* + 1 quoted phrase)
  • critique_decision_not_allowed_policy:* — Critique-Entscheidung liegt außerhalb der Policy-Allowlist
  • critique_decision_denied_execution:* — Runtime hat die Execution-Entscheidung abgelehnt
  • patch_violation:no_new_facts — Revision hat neue Fakten hinzugefügt
  • patch_violation:length_increase_limit — Revision hat das Längenwachstums-Limit überschritten
  • patch_violation:restricted_claims — Revision enthält verbotene Claims; bei avoid_absolute_guarantees=true werden sie strikt blockiert
  • patch_violation:required_changes_not_applied — Revision hat Pflichtänderungen nicht angewendet
  • patch_violation:too_large_edit — Revision überschritt den patch-only Rahmen
  • policy_escalation — Critique lieferte Eskalation; Detail in escalation_reason
  • max_seconds — gesamtes Run-Zeitbudget überschritten
  • invalid_answer:* — finale Antwort hat die Validierung nicht bestanden

Was hier NICHT gezeigt wird

  • persisted audit storage (DB / object storage)
  • retry/backoff und circuit breaker für LLM
  • Human-Review-Queue für policy_escalation
  • domänenspezifische semantically-aware Prüfung von required_changes

Was du als Nächstes ausprobieren kannst

  1. AUTO_REVISION_ENABLED deaktivieren und critique_decision_denied_execution:revise prüfen.
  2. Ein "severity budget" ergänzen (z. B. medium+ für bestimmte Tenants blockieren).
  3. audit.diff_excerpt in einen externen Log-Sink (S3/ELK) für Incident-Analysen speichern.
⏱️ 21 Min. LesezeitAktualisiert 2. März 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.

Autor

Nick — Engineer, der Infrastruktur für KI-Agenten in Produktion aufbaut.

Fokus: Agent-Patterns, Failure-Modes, Runtime-Steuerung und Systemzuverlässigkeit.

🔗 GitHub: https://github.com/mykolademyanov


Redaktioneller Hinweis

Diese Dokumentation ist KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Der Inhalt basiert auf realen Ausfällen, Post-Mortems und operativen Vorfällen in produktiv eingesetzten KI-Agenten-Systemen.