Self-Critique Agent in Python: Full Example

Production-style runnable Self-Critique agent example in Python with Draft → Critique → Revise → Audit, policy/execution boundary, and explicit stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. context.py — deterministic context
  10. gateway.py — critique/revision policy boundary
  11. llm.py — draft/critique/revise calls
  12. audit.py — change journal
  13. main.py — full-cycle orchestration
  14. Example output
  15. Typical stop_reason values
  16. What is NOT shown here
  17. What to try next

Pattern Essence (Brief)

Self-Critique Agent is a pattern where after a draft answer the agent produces a structured critique (risks + required_changes), performs one bounded revision, and records an audit of changes.

LLM proposes the draft and critique, while gateway policy decides whether this can be executed and whether the revision stays within contract boundaries.


What this example demonstrates

  • production-like flow: Draft -> Critique -> Revise (optional) -> Audit
  • strictly structured critique artifact: decision, severity, risks, required_changes
  • policy vs execution separation for critique decisions
  • constrained revision: no_new_facts, length increase cap, required changes enforcement
  • change audit (before/after hash, delta_chars, diff_excerpt)
  • explicit stop_reason, trace, history for production monitoring

Architecture

  1. LLM generates a draft (draft).
  2. LLM critic returns structured critique JSON.
  3. Gateway validates critique contract by policy.
  4. Execution layer enforces runtime decision allowlist.
  5. If revise, one revision with guardrails is executed.
  6. Change audit log and final answer are produced.

Key contract: the agent cannot "freely rewrite" text; changes are controlled by required_changes and runtime guardrails.


Project structure

TEXT
agent-patterns/
└── self-critique-agent/
    └── python/
        ├── main.py          # Draft -> Critique -> Revise -> Audit
        ├── llm.py           # draft/critique/revise LLM calls
        ├── gateway.py       # contract + guardrails + policy/execution checks
        ├── context.py       # deterministic incident context
        ├── audit.py         # diff metadata for audit log
        ├── requirements.txt
        └── README.md

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv.


Task

Imagine a production case:

"Prepare a customer-facing update about a payment incident, but do not change facts and do not provide ETA guarantees."

Problem with ordinary revision: the model can "improve" wording but accidentally change meaning.

Solution

In this example:

  • critique is formalized: what is wrong and what must be fixed
  • revision has strict boundaries
  • audit shows what actually changed
  • high-risk cases are stopped via policy_escalation

Code

context.py — deterministic context

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260306",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "max_length_increase_pct": 20,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page every 15 minutes",
            "prepare support macro with workaround guidance",
        ],
    }

What matters most here (in plain words)

  • All subsequent steps are based on fixed facts.
  • This reduces run-to-run "drift" and simplifies audit.

gateway.py — critique/revision policy boundary

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    max_draft_chars: int = 900
    max_risks: int = 5
    max_required_changes: int = 5
    max_answer_chars: int = 980
    max_length_increase_pct: float = 20.0
    min_patch_similarity: float = 0.4


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]

ALLOWED_SEVERITY = {"low", "medium", "high"}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))


def _extract_incident_ids(text: str) -> set[str]:
    return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key in sorted(value):
            item = value[key]
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in required_changes:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        # Keep extraction deterministic and explicit:
        # - REMOVE/MUST_REMOVE => must_remove
        # - ADD/MUST_INCLUDE   => must_include
        # - anything else      => ignored (must be blocked in critique validation)
        is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
            ("remove ", "remove:", "remove-")
        )
        is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
            ("add ", "add:", "add-")
        )

        if is_remove_rule:
            for phrase in quoted:
                _append_unique(must_remove, phrase)
            continue
        if is_add_rule:
            for phrase in quoted:
                _append_unique(must_include, phrase)
            continue

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _is_high_risk_risk_type(risk_type: str) -> bool:
    return risk_type in {"legal_risk", "policy_violation"}


def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
    # Compare using token-like normalization so punctuation differences
    # (e.g. trailing dots/commas) do not cause false negatives.
    normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
    normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
    normalized_text = " ".join(normalized_text.split())
    normalized_phrase = " ".join(normalized_phrase.split())
    return normalized_phrase in normalized_text


def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
    cleaned = text
    normalized_phrase = _normalize_space(phrase).strip()
    if not normalized_phrase:
        return cleaned

    variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
    for variant in variants:
        if not variant:
            continue
        cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)

    cleaned = re.sub(r"\s+\.", ".", cleaned)
    cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


def _append_phrase_sentence(*, text: str, phrase: str) -> str:
    sentence = _normalize_space(phrase).strip()
    if not sentence:
        return text

    out = text.rstrip()
    if out and out[-1] not in ".!?":
        out += "."
    separator = "\n\n" if "\n\n" in out else " "
    return (out + separator + sentence).strip()


def _is_enforceable_required_change(item: str) -> bool:
    item_norm = _normalize_space(item).lower()
    has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
        ("remove ", "remove:", "remove-", "add ", "add:", "add-")
    )
    if not has_marker:
        return False

    quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
    quoted = [value for value in quoted if value]
    return len(quoted) == 1


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    out = draft.strip()
    if len(out) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return out


def validate_critique(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_risk_types_policy: set[str],
    max_risks: int,
    max_required_changes: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_critique:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_critique:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"critique_decision_not_allowed_policy:{decision}")

    severity = raw.get("severity", "medium")
    if not isinstance(severity, str) or not severity.strip():
        raise StopRun("invalid_critique:severity")
    severity = severity.strip().lower()
    if severity not in ALLOWED_SEVERITY:
        raise StopRun("invalid_critique:severity")

    risks_raw = raw.get("risks", [])
    if not isinstance(risks_raw, list):
        raise StopRun("invalid_critique:risks")
    if len(risks_raw) > max_risks:
        raise StopRun("invalid_critique:too_many_risks")

    risks: list[dict[str, str]] = []
    for item in risks_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_critique:risk_item")

        risk_type = item.get("type")
        note = item.get("note")

        if not isinstance(risk_type, str) or not risk_type.strip():
            raise StopRun("invalid_critique:risk_type")
        risk_type = risk_type.strip()
        if risk_type not in allowed_risk_types_policy:
            raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_critique:risk_note")

        risks.append({"type": risk_type, "note": note.strip()})

    required_changes_raw = raw.get("required_changes", [])
    if not isinstance(required_changes_raw, list):
        raise StopRun("invalid_critique:required_changes")
    if len(required_changes_raw) > max_required_changes:
        raise StopRun("invalid_critique:too_many_required_changes")

    required_changes: list[str] = []
    for item in required_changes_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_critique:required_change_item")
        required_changes.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_critique:reason")
    reason = reason.strip()

    high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)

    if decision == "approve":
        if required_changes:
            raise StopRun("invalid_critique:approve_with_required_changes")
        if high_risk:
            raise StopRun("invalid_critique:approve_with_high_risk")

    if decision == "revise":
        if not required_changes:
            raise StopRun("invalid_critique:revise_without_required_changes")
        if not all(_is_enforceable_required_change(item) for item in required_changes):
            raise StopRun("invalid_critique:required_changes_not_enforceable")
        if high_risk:
            raise StopRun("invalid_critique:high_risk_requires_escalate")

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_critique:escalate_reason_required")

    return {
        "decision": decision,
        "severity": severity,
        "risks": risks,
        "required_changes": required_changes,
        "reason": reason,
        "high_risk": high_risk,
    }


class SelfCritiqueGateway:
    def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"critique_decision_denied_execution:{decision}")

    def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
        """
        Deterministic fallback for enforceable required changes:
        remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
        """
        candidate = (text or "").strip()
        if not candidate:
            return candidate

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        for phrase in must_remove:
            candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)

        for phrase in must_include:
            if not _contains_normalized_phrase(text=candidate, phrase=phrase):
                candidate = _append_phrase_sentence(text=candidate, phrase=phrase)

        return candidate.strip()

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        required_changes: list[str],
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        original_len = max(1, len(normalized_original))
        revised_len = len(normalized_revised)
        increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
        policy_hint_raw = (
            context.get("policy_hints", {}).get("max_length_increase_pct")
            if isinstance(context, dict)
            else None
        )
        policy_hint_cap = self.budget.max_length_increase_pct
        if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
            policy_hint_cap = float(policy_hint_raw)

        effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
        if increase_pct > effective_length_cap:
            raise StopRun("patch_violation:length_increase_limit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)

        if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
            raise StopRun("patch_violation:no_new_facts")

        if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
            raise StopRun("patch_violation:new_incident_id")

        if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
            raise StopRun("patch_violation:new_severity_label")

        if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
            raise StopRun("patch_violation:new_region")

        avoid_absolute_guarantees = bool(
            context.get("policy_hints", {}).get("avoid_absolute_guarantees")
            if isinstance(context, dict)
            else False
        )
        for claim_re in RESTRICTED_CLAIMS_RE:
            if avoid_absolute_guarantees:
                if claim_re.search(revised_clean):
                    raise StopRun("patch_violation:restricted_claims")
                continue
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        if must_include or must_remove:
            revised_lower = normalized_revised.lower()
            if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")
            if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "length_increase_pct": round(increase_pct, 2),
            "required_changes_total": len(required_changes),
            "required_changes_enforced": len(must_include) + len(must_remove),
            "required_changes_unenforced": len(required_changes)
            - (len(must_include) + len(must_remove)),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        out = answer.strip()
        if len(out) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return out

What matters most here (in plain words)

  • Critique is not "arbitrary text": it passes strict validation.
  • Revision cannot go beyond factual context and budget limits.
  • Enforceable required_changes use explicit ADD/REMOVE markers (or MUST_INCLUDE/MUST_REMOVE).
  • If avoid_absolute_guarantees=True, restricted claims are blocked regardless of draft content (even if they were already in the original draft).
  • required_changes_total shows the total number of required_changes.
  • required_changes_enforced shows how many required_changes became enforce rules.
  • required_changes_unenforced shows how many required_changes cannot be checked by gateway.

llm.py — draft/critique/revise calls

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "customer-facing incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "draft": "shortened customer-facing incident update"
}

Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "severity": "low|medium|high",
  "risks": [{"type":"overconfidence","note":"..."}],
  "required_changes": ["concrete change"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
  - REMOVE "phrase to delete"
  - ADD "phrase to include"
  - MUST_REMOVE "phrase to delete" (equivalent)
  - MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def shorten_draft(*, draft: str, max_chars: int) -> str:
    payload = {
        "draft": draft,
        "max_chars": int(max_chars),
    }
    data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)

    shortened = data.get("draft")
    if not isinstance(shortened, str):
        raise LLMInvalid("llm_invalid_schema")

    shortened = shortened.strip()
    if not shortened:
        raise LLMEmpty("llm_empty")
    return shortened


def critique_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_risk_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_risk_types": allowed_risk_types,
    }
    return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    required_changes: list[str],
    strict_mode: bool = False,
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "required_changes": required_changes,
    }
    system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
    data = _chat_json(system_prompt=system_prompt, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

What matters most here (in plain words)

  • Critique output has a stable JSON contract.
  • Critique prompt explicitly requires enforceable required_changes via explicit ADD/REMOVE markers.

audit.py — change journal

PYTHON
from __future__ import annotations

import difflib
import hashlib
import re
from typing import Any


SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")


def _hash_text(text: str) -> str:
    normalized = " ".join((text or "").split())
    return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]


def _split_for_diff(text: str) -> list[str]:
    lines = (text or "").splitlines()
    if len(lines) > 1:
        return lines

    normalized = (text or "").strip()
    if not normalized:
        return [""]

    sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
    if len(sentences) > 1:
        return sentences

    chunk_size = 80
    return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]


def build_audit_log(
    *,
    before: str,
    after: str,
    risks: list[dict[str, Any]],
    required_changes: list[str],
) -> dict[str, Any]:
    before_text = (before or "").strip()
    after_text = (after or "").strip()

    before_chars = len(before_text)
    after_chars = len(after_text)
    delta_chars = after_chars - before_chars

    if before_chars <= 0:
        increase_pct = 0.0
    else:
        increase_pct = (delta_chars / float(before_chars)) * 100.0

    raw_diff = list(
        difflib.unified_diff(
            _split_for_diff(before_text),
            _split_for_diff(after_text),
            fromfile="before",
            tofile="after",
            lineterm="",
        )
    )

    diff_excerpt: list[str] = []
    for line in raw_diff:
        if line.startswith(("---", "+++", "@@")):
            continue
        if line.startswith(("+", "-")):
            diff_excerpt.append(line)
        if len(diff_excerpt) >= 6:
            break

    return {
        "changed": before_text != after_text,
        "before_hash": _hash_text(before_text),
        "after_hash": _hash_text(after_text),
        "before_chars": before_chars,
        "after_chars": after_chars,
        "delta_chars": delta_chars,
        "length_increase_pct": round(increase_pct, 2),
        "risks_count": len(risks),
        "required_changes_count": len(required_changes),
        "diff_excerpt": diff_excerpt,
    }

What matters most here (in plain words)

  • After revision, audit metadata is stored, not just final text.
  • This enables debugging of "why this version passed policy".

main.py — full-cycle orchestration

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    max_draft_chars=900,
    max_risks=5,
    max_required_changes=5,
    max_answer_chars=980,
    max_length_increase_pct=20.0,
    min_patch_similarity=0.4,
)

ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
    ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_RISK_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = SelfCritiqueGateway(
        allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    draft_attempts = 0
    draft_retried = False
    try:
        draft_attempts += 1
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        try:
            draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
        except StopRun as exc:
            if exc.reason != "invalid_draft:too_long":
                raise
            # One bounded recovery attempt: shorten draft within policy budget.
            draft_attempts += 1
            draft_retried = True
            shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
            draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "attempts_used": draft_attempts,
            "retried": draft_retried,
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="critique")

    try:
        raw_critique = critique_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="critique")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="critique")

    try:
        critique = validate_critique(
            raw_critique,
            allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
            allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
            max_risks=BUDGET.max_risks,
            max_required_changes=BUDGET.max_required_changes,
        )
        gateway.enforce_execution_decision(critique["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="critique", raw_critique=raw_critique)

    trace.append(
        {
            "step": 2,
            "phase": "critique",
            "decision": critique["decision"],
            "severity": critique["severity"],
            "risks": len(critique["risks"]),
            "required_changes": len(critique["required_changes"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "critique_once",
            "critique": critique,
        }
    )

    if critique["decision"] == "escalate":
        escalation_reason = str(critique.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "critique",
            "critique": critique,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if critique["decision"] == "revise":
        revise_attempts = 0
        revise_retried = False
        revised_payload: dict[str, Any] | None = None
        last_revised_candidate = draft
        for attempt in range(1, 4):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="revise")

            revise_attempts = attempt
            strict_mode = attempt > 1
            try:
                revised_raw = revise_once(
                    goal=goal,
                    incident_context=incident_context,
                    draft=draft,
                    required_changes=critique["required_changes"],
                    strict_mode=strict_mode,
                )
                last_revised_candidate = revised_raw
                revised_payload = gateway.validate_revision(
                    original=draft,
                    revised=revised_raw,
                    context=incident_context,
                    required_changes=critique["required_changes"],
                )
                break
            except LLMTimeout:
                return stopped("llm_timeout", phase="revise")
            except LLMInvalid as exc:
                return stopped(exc.args[0], phase="revise")
            except LLMEmpty:
                return stopped("llm_empty", phase="revise")
            except StopRun as exc:
                if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
                    revise_retried = True
                    continue
                if exc.reason == "patch_violation:required_changes_not_applied":
                    # Final fallback: enforce required phrase edits deterministically.
                    try:
                        fallback_revised = gateway.apply_required_changes_fallback(
                            text=last_revised_candidate,
                            required_changes=critique["required_changes"],
                        )
                        revised_payload = gateway.validate_revision(
                            original=draft,
                            revised=fallback_revised,
                            context=incident_context,
                            required_changes=critique["required_changes"],
                        )
                        revise_attempts = attempt + 1
                        revise_retried = True
                        break
                    except StopRun as fallback_exc:
                        return stopped(fallback_exc.reason, phase="revise")
                return stopped(exc.reason, phase="revise")

        if revised_payload is None:
            return stopped("patch_violation:required_changes_not_applied", phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "length_increase_pct": revised_payload["length_increase_pct"],
                "required_changes_total": revised_payload["required_changes_total"],
                "required_changes_enforced": revised_payload["required_changes_enforced"],
                "required_changes_unenforced": revised_payload["required_changes_unenforced"],
                "attempts_used": revise_attempts,
                "retried": revise_retried,
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "required_changes": critique["required_changes"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    audit_log = build_audit_log(
        before=draft,
        after=final_answer,
        risks=critique["risks"],
        required_changes=critique["required_changes"],
    )

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "audit_finalize",
            "final_hash": text_hash(final_answer),
            "changed": audit_log["changed"],
            "diff_lines": len(audit_log["diff_excerpt"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "audit_finalize",
            "status": "final",
            "changed": audit_log["changed"],
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "critique_decision": critique["decision"],
        "severity": critique["severity"],
        "risks": critique["risks"],
        "required_changes": critique["required_changes"],
        "audit": audit_log,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (in plain words)

  • critique_decision drives the flow, but runtime controls execution permissions.
  • Critique and revision remain transparent through trace + history + audit.

Example output

JSON
{
  "run_id": "721e4231-7b9a-4843-99e0-888616025b35",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
  "critique_decision": "revise",
  "severity": "medium",
  "risks": [
    {
      "type": "overconfidence",
      "note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
    }
  ],
  "required_changes": [
    "MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
    "MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
    "MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
    "MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
  ],
  "audit": {
    "changed": true,
    "before_hash": "7cdaa40c3fa4",
    "after_hash": "33b356380537",
    "before_chars": 770,
    "after_chars": 827,
    "delta_chars": 57,
    "length_increase_pct": 7.4,
    "risks_count": 1,
    "required_changes_count": 4,
    "diff_excerpt": [
      "-...",
      "+..."
    ]
  },
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "9b944d7375bf",
      "chars": 770,
      "attempts_used": 2,
      "retried": true,
      "ok": true
    },
    {
      "step": 2,
      "phase": "critique",
      "decision": "revise",
      "severity": "medium",
      "risks": 1,
      "required_changes": 4,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.849,
      "length_increase_pct": 7.42,
      "required_changes_total": 4,
      "required_changes_enforced": 4,
      "required_changes_unenforced": 0,
      "attempts_used": 2,
      "retried": true,
      "revised_hash": "d92d90a2adbe",
      "ok": true
    },
    {
      "step": 4,
      "phase": "audit_finalize",
      "final_hash": "d92d90a2adbe",
      "changed": true,
      "diff_lines": 4,
      "ok": true
    }
  ],
  "history": [{...}]
}

Typical stop_reason values

  • success — run completed correctly
  • llm_timeout — LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_empty — empty response from LLM in draft/revise
  • llm_invalid_json — LLM returned invalid JSON
  • llm_invalid_schema — JSON does not match contract
  • invalid_draft:* — draft failed basic validation
  • invalid_critique:* — critique failed policy-layer contract
  • invalid_critique:required_changes_not_enforceable — for decision=revise, required_changes must be in enforceable format (ADD/REMOVE/MUST_* + 1 quoted phrase)
  • critique_decision_not_allowed_policy:* — critique decision is outside policy allowlist
  • critique_decision_denied_execution:* — runtime denied execution decision
  • patch_violation:no_new_facts — revision introduced new facts
  • patch_violation:length_increase_limit — revision exceeded length increase limit
  • patch_violation:restricted_claims — revision contains restricted claims; with avoid_absolute_guarantees=true they are blocked strictly
  • patch_violation:required_changes_not_applied — revision did not apply required changes
  • patch_violation:too_large_edit — revision exceeded patch-only boundaries
  • policy_escalation — critique returned escalation; details in escalation_reason
  • max_seconds — total run time budget exceeded
  • invalid_answer:* — final answer failed validation

What is NOT shown here

  • persisted audit storage (DB / object storage)
  • retry/backoff and circuit breaker for LLM
  • human review queue for policy_escalation
  • domain semantically-aware validation of required_changes

What to try next

  1. Disable AUTO_REVISION_ENABLED and verify critique_decision_denied_execution:revise.
  2. Add a "severity budget" (for example, block medium+ for specific tenants).
  3. Store audit.diff_excerpt in an external log sink (S3/ELK) for incident analysis.
⏱️ 21 min readUpdated March 2, 2026Difficulty: ★★☆
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.

Author

Nick — engineer building infrastructure for production AI agents.

Focus: agent patterns, failure modes, runtime control, and system reliability.

🔗 GitHub: https://github.com/mykolademyanov


Editorial note

This documentation is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Content is grounded in real-world failures, post-mortems, and operational incidents in deployed AI agent systems.