Self-Critique Agent en Python: Ejemplo completo

Ejemplo ejecutable de agente Self-Critique en Python con estilo de producción, con Draft → Critique → Revise → Audit, policy/execution boundary y stop reasons explícitos.
En esta página
  1. Esencia del patrón (breve)
  2. Qué demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Solución
  8. Código
  9. context.py — contexto determinista
  10. gateway.py — critique/revision policy boundary
  11. llm.py — llamadas draft/critique/revise
  12. audit.py — registro de cambios
  13. main.py — orquestación de ciclo completo
  14. Ejemplo de salida
  15. Valores típicos de stop_reason
  16. Qué NO se muestra aquí
  17. Qué probar después

Esencia del patrón (breve)

Self-Critique Agent es un patrón en el que, después del borrador, el agente genera una crítica estructurada (risks + required_changes), hace una revisión limitada y registra una auditoría de cambios.

El LLM propone el borrador y la crítica, y la gateway policy decide si eso puede ejecutarse y si la revisión se mantiene dentro de los límites del contrato.


Qué demuestra este ejemplo

  • production-like flow: Draft -> Critique -> Revise (optional) -> Audit
  • artefacto de crítica estrictamente estructurado: decision, severity, risks, required_changes
  • separación policy vs execution para decisiones de crítica
  • constrained revision: no_new_facts, length increase cap, required changes enforcement
  • auditoría de cambios (before/after hash, delta_chars, diff_excerpt)
  • stop_reason, trace, history explícitos para monitoreo en producción

Arquitectura

  1. El LLM genera un borrador (draft).
  2. El crítico LLM devuelve un critique JSON estructurado.
  3. Gateway valida el contrato de crítica según policy.
  4. La execution layer enforce la runtime allowlist de decisiones.
  5. Si hay revise, se ejecuta una sola revisión con guardrails.
  6. Se genera el audit log de cambios y la respuesta final.

Contrato clave: el agente no puede "reescribir libremente" el texto; los cambios se controlan por required_changes y runtime guardrails.


Estructura del proyecto

TEXT
agent-patterns/
└── self-critique-agent/
    └── python/
        ├── main.py          # Draft -> Critique -> Revise -> Audit
        ├── llm.py           # draft/critique/revise LLM calls
        ├── gateway.py       # contract + guardrails + policy/execution checks
        ├── context.py       # deterministic incident context
        ├── audit.py         # diff metadata for audit log
        ├── requirements.txt
        └── README.md

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv.


Tarea

Imagina un caso de producción:

"Prepara una actualización customer-facing sobre un payment incident, pero no cambies hechos ni des garantías de ETA."

Problema de una revisión normal: el modelo puede "mejorar" el texto pero cambiar el significado por accidente.

Solución

En este ejemplo:

  • la crítica está formalizada: qué está mal y qué debe corregirse
  • la revisión tiene límites estrictos
  • la auditoría muestra qué cambió realmente
  • los casos de alto riesgo se detienen con policy_escalation

Código

context.py — contexto determinista

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260306",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "max_length_increase_pct": 20,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page every 15 minutes",
            "prepare support macro with workaround guidance",
        ],
    }

Lo más importante aquí (en simple)

  • Todos los pasos siguientes se basan en hechos fijos.
  • Esto reduce el "drift" entre ejecuciones y simplifica la auditoría.

gateway.py — critique/revision policy boundary

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    max_draft_chars: int = 900
    max_risks: int = 5
    max_required_changes: int = 5
    max_answer_chars: int = 980
    max_length_increase_pct: float = 20.0
    min_patch_similarity: float = 0.4


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]

ALLOWED_SEVERITY = {"low", "medium", "high"}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))


def _extract_incident_ids(text: str) -> set[str]:
    return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key in sorted(value):
            item = value[key]
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in required_changes:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        # Keep extraction deterministic and explicit:
        # - REMOVE/MUST_REMOVE => must_remove
        # - ADD/MUST_INCLUDE   => must_include
        # - anything else      => ignored (must be blocked in critique validation)
        is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
            ("remove ", "remove:", "remove-")
        )
        is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
            ("add ", "add:", "add-")
        )

        if is_remove_rule:
            for phrase in quoted:
                _append_unique(must_remove, phrase)
            continue
        if is_add_rule:
            for phrase in quoted:
                _append_unique(must_include, phrase)
            continue

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _is_high_risk_risk_type(risk_type: str) -> bool:
    return risk_type in {"legal_risk", "policy_violation"}


def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
    # Compare using token-like normalization so punctuation differences
    # (e.g. trailing dots/commas) do not cause false negatives.
    normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
    normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
    normalized_text = " ".join(normalized_text.split())
    normalized_phrase = " ".join(normalized_phrase.split())
    return normalized_phrase in normalized_text


def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
    cleaned = text
    normalized_phrase = _normalize_space(phrase).strip()
    if not normalized_phrase:
        return cleaned

    variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
    for variant in variants:
        if not variant:
            continue
        cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)

    cleaned = re.sub(r"\s+\.", ".", cleaned)
    cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


def _append_phrase_sentence(*, text: str, phrase: str) -> str:
    sentence = _normalize_space(phrase).strip()
    if not sentence:
        return text

    out = text.rstrip()
    if out and out[-1] not in ".!?":
        out += "."
    separator = "\n\n" if "\n\n" in out else " "
    return (out + separator + sentence).strip()


def _is_enforceable_required_change(item: str) -> bool:
    item_norm = _normalize_space(item).lower()
    has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
        ("remove ", "remove:", "remove-", "add ", "add:", "add-")
    )
    if not has_marker:
        return False

    quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
    quoted = [value for value in quoted if value]
    return len(quoted) == 1


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    out = draft.strip()
    if len(out) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return out


def validate_critique(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_risk_types_policy: set[str],
    max_risks: int,
    max_required_changes: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_critique:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_critique:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"critique_decision_not_allowed_policy:{decision}")

    severity = raw.get("severity", "medium")
    if not isinstance(severity, str) or not severity.strip():
        raise StopRun("invalid_critique:severity")
    severity = severity.strip().lower()
    if severity not in ALLOWED_SEVERITY:
        raise StopRun("invalid_critique:severity")

    risks_raw = raw.get("risks", [])
    if not isinstance(risks_raw, list):
        raise StopRun("invalid_critique:risks")
    if len(risks_raw) > max_risks:
        raise StopRun("invalid_critique:too_many_risks")

    risks: list[dict[str, str]] = []
    for item in risks_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_critique:risk_item")

        risk_type = item.get("type")
        note = item.get("note")

        if not isinstance(risk_type, str) or not risk_type.strip():
            raise StopRun("invalid_critique:risk_type")
        risk_type = risk_type.strip()
        if risk_type not in allowed_risk_types_policy:
            raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_critique:risk_note")

        risks.append({"type": risk_type, "note": note.strip()})

    required_changes_raw = raw.get("required_changes", [])
    if not isinstance(required_changes_raw, list):
        raise StopRun("invalid_critique:required_changes")
    if len(required_changes_raw) > max_required_changes:
        raise StopRun("invalid_critique:too_many_required_changes")

    required_changes: list[str] = []
    for item in required_changes_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_critique:required_change_item")
        required_changes.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_critique:reason")
    reason = reason.strip()

    high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)

    if decision == "approve":
        if required_changes:
            raise StopRun("invalid_critique:approve_with_required_changes")
        if high_risk:
            raise StopRun("invalid_critique:approve_with_high_risk")

    if decision == "revise":
        if not required_changes:
            raise StopRun("invalid_critique:revise_without_required_changes")
        if not all(_is_enforceable_required_change(item) for item in required_changes):
            raise StopRun("invalid_critique:required_changes_not_enforceable")
        if high_risk:
            raise StopRun("invalid_critique:high_risk_requires_escalate")

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_critique:escalate_reason_required")

    return {
        "decision": decision,
        "severity": severity,
        "risks": risks,
        "required_changes": required_changes,
        "reason": reason,
        "high_risk": high_risk,
    }


class SelfCritiqueGateway:
    def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"critique_decision_denied_execution:{decision}")

    def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
        """
        Deterministic fallback for enforceable required changes:
        remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
        """
        candidate = (text or "").strip()
        if not candidate:
            return candidate

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        for phrase in must_remove:
            candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)

        for phrase in must_include:
            if not _contains_normalized_phrase(text=candidate, phrase=phrase):
                candidate = _append_phrase_sentence(text=candidate, phrase=phrase)

        return candidate.strip()

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        required_changes: list[str],
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        original_len = max(1, len(normalized_original))
        revised_len = len(normalized_revised)
        increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
        policy_hint_raw = (
            context.get("policy_hints", {}).get("max_length_increase_pct")
            if isinstance(context, dict)
            else None
        )
        policy_hint_cap = self.budget.max_length_increase_pct
        if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
            policy_hint_cap = float(policy_hint_raw)

        effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
        if increase_pct > effective_length_cap:
            raise StopRun("patch_violation:length_increase_limit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)

        if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
            raise StopRun("patch_violation:no_new_facts")

        if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
            raise StopRun("patch_violation:new_incident_id")

        if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
            raise StopRun("patch_violation:new_severity_label")

        if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
            raise StopRun("patch_violation:new_region")

        avoid_absolute_guarantees = bool(
            context.get("policy_hints", {}).get("avoid_absolute_guarantees")
            if isinstance(context, dict)
            else False
        )
        for claim_re in RESTRICTED_CLAIMS_RE:
            if avoid_absolute_guarantees:
                if claim_re.search(revised_clean):
                    raise StopRun("patch_violation:restricted_claims")
                continue
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        if must_include or must_remove:
            revised_lower = normalized_revised.lower()
            if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")
            if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "length_increase_pct": round(increase_pct, 2),
            "required_changes_total": len(required_changes),
            "required_changes_enforced": len(must_include) + len(must_remove),
            "required_changes_unenforced": len(required_changes)
            - (len(must_include) + len(must_remove)),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        out = answer.strip()
        if len(out) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return out

Lo más importante aquí (en simple)

  • La crítica no es "texto arbitrario": pasa validación estricta.
  • La revisión no puede salir del contexto factual ni de los límites de presupuesto.
  • Para required_changes enforceables se usan marcadores explícitos ADD/REMOVE (o MUST_INCLUDE/MUST_REMOVE).
  • Si avoid_absolute_guarantees=True, los restricted claims se bloquean independientemente del borrador (incluso si ya estaban en el draft original).
  • required_changes_total muestra la cantidad total de required_changes.
  • required_changes_enforced muestra cuántos required_changes se convirtieron en reglas enforce.
  • required_changes_unenforced muestra cuántos required_changes no pueden ser verificados por gateway.

llm.py — llamadas draft/critique/revise

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "customer-facing incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "draft": "shortened customer-facing incident update"
}

Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "severity": "low|medium|high",
  "risks": [{"type":"overconfidence","note":"..."}],
  "required_changes": ["concrete change"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
  - REMOVE "phrase to delete"
  - ADD "phrase to include"
  - MUST_REMOVE "phrase to delete" (equivalent)
  - MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def shorten_draft(*, draft: str, max_chars: int) -> str:
    payload = {
        "draft": draft,
        "max_chars": int(max_chars),
    }
    data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)

    shortened = data.get("draft")
    if not isinstance(shortened, str):
        raise LLMInvalid("llm_invalid_schema")

    shortened = shortened.strip()
    if not shortened:
        raise LLMEmpty("llm_empty")
    return shortened


def critique_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_risk_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_risk_types": allowed_risk_types,
    }
    return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    required_changes: list[str],
    strict_mode: bool = False,
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "required_changes": required_changes,
    }
    system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
    data = _chat_json(system_prompt=system_prompt, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Lo más importante aquí (en simple)

  • La salida de critique tiene un contrato JSON estable.
  • El prompt de critique exige explícitamente required_changes enforceables con marcadores ADD/REMOVE.

audit.py — registro de cambios

PYTHON
from __future__ import annotations

import difflib
import hashlib
import re
from typing import Any


SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")


def _hash_text(text: str) -> str:
    normalized = " ".join((text or "").split())
    return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]


def _split_for_diff(text: str) -> list[str]:
    lines = (text or "").splitlines()
    if len(lines) > 1:
        return lines

    normalized = (text or "").strip()
    if not normalized:
        return [""]

    sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
    if len(sentences) > 1:
        return sentences

    chunk_size = 80
    return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]


def build_audit_log(
    *,
    before: str,
    after: str,
    risks: list[dict[str, Any]],
    required_changes: list[str],
) -> dict[str, Any]:
    before_text = (before or "").strip()
    after_text = (after or "").strip()

    before_chars = len(before_text)
    after_chars = len(after_text)
    delta_chars = after_chars - before_chars

    if before_chars <= 0:
        increase_pct = 0.0
    else:
        increase_pct = (delta_chars / float(before_chars)) * 100.0

    raw_diff = list(
        difflib.unified_diff(
            _split_for_diff(before_text),
            _split_for_diff(after_text),
            fromfile="before",
            tofile="after",
            lineterm="",
        )
    )

    diff_excerpt: list[str] = []
    for line in raw_diff:
        if line.startswith(("---", "+++", "@@")):
            continue
        if line.startswith(("+", "-")):
            diff_excerpt.append(line)
        if len(diff_excerpt) >= 6:
            break

    return {
        "changed": before_text != after_text,
        "before_hash": _hash_text(before_text),
        "after_hash": _hash_text(after_text),
        "before_chars": before_chars,
        "after_chars": after_chars,
        "delta_chars": delta_chars,
        "length_increase_pct": round(increase_pct, 2),
        "risks_count": len(risks),
        "required_changes_count": len(required_changes),
        "diff_excerpt": diff_excerpt,
    }

Lo más importante aquí (en simple)

  • Después de la revisión se guarda metadata de auditoría, no solo el texto final.
  • Esto permite depurar "por qué esta versión pasó policy".

main.py — orquestación de ciclo completo

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    max_draft_chars=900,
    max_risks=5,
    max_required_changes=5,
    max_answer_chars=980,
    max_length_increase_pct=20.0,
    min_patch_similarity=0.4,
)

ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
    ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_RISK_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = SelfCritiqueGateway(
        allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    draft_attempts = 0
    draft_retried = False
    try:
        draft_attempts += 1
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        try:
            draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
        except StopRun as exc:
            if exc.reason != "invalid_draft:too_long":
                raise
            # One bounded recovery attempt: shorten draft within policy budget.
            draft_attempts += 1
            draft_retried = True
            shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
            draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "attempts_used": draft_attempts,
            "retried": draft_retried,
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="critique")

    try:
        raw_critique = critique_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="critique")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="critique")

    try:
        critique = validate_critique(
            raw_critique,
            allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
            allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
            max_risks=BUDGET.max_risks,
            max_required_changes=BUDGET.max_required_changes,
        )
        gateway.enforce_execution_decision(critique["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="critique", raw_critique=raw_critique)

    trace.append(
        {
            "step": 2,
            "phase": "critique",
            "decision": critique["decision"],
            "severity": critique["severity"],
            "risks": len(critique["risks"]),
            "required_changes": len(critique["required_changes"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "critique_once",
            "critique": critique,
        }
    )

    if critique["decision"] == "escalate":
        escalation_reason = str(critique.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "critique",
            "critique": critique,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if critique["decision"] == "revise":
        revise_attempts = 0
        revise_retried = False
        revised_payload: dict[str, Any] | None = None
        last_revised_candidate = draft
        for attempt in range(1, 4):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="revise")

            revise_attempts = attempt
            strict_mode = attempt > 1
            try:
                revised_raw = revise_once(
                    goal=goal,
                    incident_context=incident_context,
                    draft=draft,
                    required_changes=critique["required_changes"],
                    strict_mode=strict_mode,
                )
                last_revised_candidate = revised_raw
                revised_payload = gateway.validate_revision(
                    original=draft,
                    revised=revised_raw,
                    context=incident_context,
                    required_changes=critique["required_changes"],
                )
                break
            except LLMTimeout:
                return stopped("llm_timeout", phase="revise")
            except LLMInvalid as exc:
                return stopped(exc.args[0], phase="revise")
            except LLMEmpty:
                return stopped("llm_empty", phase="revise")
            except StopRun as exc:
                if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
                    revise_retried = True
                    continue
                if exc.reason == "patch_violation:required_changes_not_applied":
                    # Final fallback: enforce required phrase edits deterministically.
                    try:
                        fallback_revised = gateway.apply_required_changes_fallback(
                            text=last_revised_candidate,
                            required_changes=critique["required_changes"],
                        )
                        revised_payload = gateway.validate_revision(
                            original=draft,
                            revised=fallback_revised,
                            context=incident_context,
                            required_changes=critique["required_changes"],
                        )
                        revise_attempts = attempt + 1
                        revise_retried = True
                        break
                    except StopRun as fallback_exc:
                        return stopped(fallback_exc.reason, phase="revise")
                return stopped(exc.reason, phase="revise")

        if revised_payload is None:
            return stopped("patch_violation:required_changes_not_applied", phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "length_increase_pct": revised_payload["length_increase_pct"],
                "required_changes_total": revised_payload["required_changes_total"],
                "required_changes_enforced": revised_payload["required_changes_enforced"],
                "required_changes_unenforced": revised_payload["required_changes_unenforced"],
                "attempts_used": revise_attempts,
                "retried": revise_retried,
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "required_changes": critique["required_changes"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    audit_log = build_audit_log(
        before=draft,
        after=final_answer,
        risks=critique["risks"],
        required_changes=critique["required_changes"],
    )

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "audit_finalize",
            "final_hash": text_hash(final_answer),
            "changed": audit_log["changed"],
            "diff_lines": len(audit_log["diff_excerpt"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "audit_finalize",
            "status": "final",
            "changed": audit_log["changed"],
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "critique_decision": critique["decision"],
        "severity": critique["severity"],
        "risks": critique["risks"],
        "required_changes": critique["required_changes"],
        "audit": audit_log,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo más importante aquí (en simple)

  • critique_decision gobierna el flow posterior, pero los permisos de ejecución los controla runtime.
  • La crítica y la revisión permanecen transparentes con trace + history + audit.

Ejemplo de salida

JSON
{
  "run_id": "721e4231-7b9a-4843-99e0-888616025b35",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
  "critique_decision": "revise",
  "severity": "medium",
  "risks": [
    {
      "type": "overconfidence",
      "note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
    }
  ],
  "required_changes": [
    "MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
    "MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
    "MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
    "MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
  ],
  "audit": {
    "changed": true,
    "before_hash": "7cdaa40c3fa4",
    "after_hash": "33b356380537",
    "before_chars": 770,
    "after_chars": 827,
    "delta_chars": 57,
    "length_increase_pct": 7.4,
    "risks_count": 1,
    "required_changes_count": 4,
    "diff_excerpt": [
      "-...",
      "+..."
    ]
  },
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "9b944d7375bf",
      "chars": 770,
      "attempts_used": 2,
      "retried": true,
      "ok": true
    },
    {
      "step": 2,
      "phase": "critique",
      "decision": "revise",
      "severity": "medium",
      "risks": 1,
      "required_changes": 4,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.849,
      "length_increase_pct": 7.42,
      "required_changes_total": 4,
      "required_changes_enforced": 4,
      "required_changes_unenforced": 0,
      "attempts_used": 2,
      "retried": true,
      "revised_hash": "d92d90a2adbe",
      "ok": true
    },
    {
      "step": 4,
      "phase": "audit_finalize",
      "final_hash": "d92d90a2adbe",
      "changed": true,
      "diff_lines": 4,
      "ok": true
    }
  ],
  "history": [{...}]
}

Valores típicos de stop_reason

  • success — el run terminó correctamente
  • llm_timeout — el LLM no respondió dentro de OPENAI_TIMEOUT_SECONDS
  • llm_empty — respuesta vacía del LLM en draft/revise
  • llm_invalid_json — el LLM devolvió JSON inválido
  • llm_invalid_schema — el JSON no cumple el contrato
  • invalid_draft:* — el borrador no pasó validación básica
  • invalid_critique:* — la crítica no pasó el contrato de policy-layer
  • invalid_critique:required_changes_not_enforceable — para decision=revise, required_changes deben estar en formato enforceable (ADD/REMOVE/MUST_* + 1 frase entre comillas)
  • critique_decision_not_allowed_policy:* — decisión de crítica fuera de policy allowlist
  • critique_decision_denied_execution:* — runtime denegó la decisión de ejecución
  • patch_violation:no_new_facts — la revisión agregó hechos nuevos
  • patch_violation:length_increase_limit — la revisión superó el límite de aumento de longitud
  • patch_violation:restricted_claims — la revisión contiene claims restringidos; con avoid_absolute_guarantees=true se bloquean estrictamente
  • patch_violation:required_changes_not_applied — la revisión no aplicó cambios obligatorios
  • patch_violation:too_large_edit — la revisión salió del límite patch-only
  • policy_escalation — la crítica devolvió escalación; detalle en escalation_reason
  • max_seconds — se excedió el presupuesto total de tiempo del run
  • invalid_answer:* — la respuesta final no pasó validación

Qué NO se muestra aquí

  • persisted audit storage (DB / object storage)
  • retry/backoff y circuit breaker para LLM
  • cola de revisión humana para policy_escalation
  • validación semántica de dominio para required_changes

Qué probar después

  1. Desactivar AUTO_REVISION_ENABLED y comprobar critique_decision_denied_execution:revise.
  2. Añadir un "severity budget" (por ejemplo, bloquear medium+ para tenants específicos).
  3. Guardar audit.diff_excerpt en un log sink externo (S3/ELK) para análisis de incidentes.
⏱️ 22 min de lecturaActualizado 2 de marzo de 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.

Autor

Nick — ingeniero que construye infraestructura para agentes de IA en producción.

Enfoque: patrones de agentes, modos de fallo, control del runtime y fiabilidad del sistema.

🔗 GitHub: https://github.com/mykolademyanov


Nota editorial

Esta documentación está asistida por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

El contenido se basa en fallos reales, post-mortems e incidentes operativos en sistemas de agentes de IA desplegados.