Self-Critique Agent на Python: повний приклад

Production-style runnable приклад Self-Critique агента на Python з Draft → Critique → Revise → Audit, policy/execution boundary і явними stop reasons.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Рішення
  8. Код
  9. context.py — детермінований контекст
  10. gateway.py — critique/revision policy boundary
  11. llm.py — draft/critique/revise виклики
  12. audit.py — журнал змін
  13. main.py — оркестрація повного циклу
  14. Приклад виводу
  15. Типові stop_reason
  16. Що тут НЕ показано
  17. Що спробувати далі

Суть патерна (коротко)

Self-Critique Agent — це патерн, у якому агент після чернетки формує структуровану критику (risks + required_changes), виконує одну обмежену правку і записує аудит змін.

LLM пропонує чернетку та критику, а gateway-policy вирішує, чи це можна виконувати та чи ревізія не виходить за межі контракту.


Що демонструє цей приклад

  • production-like flow: Draft -> Critique -> Revise (optional) -> Audit
  • строго структурований critique-артефакт: decision, severity, risks, required_changes
  • policy vs execution розділення для critique-рішень
  • constrained revision: no_new_facts, length increase cap, required changes enforcement
  • аудит змін (before/after hash, delta_chars, diff_excerpt)
  • явні stop_reason, trace, history для продакшен-моніторингу

Архітектура

  1. LLM генерує чернетку (draft).
  2. LLM-критик повертає структурований critique JSON.
  3. Gateway валідовує critique-контракт за policy.
  4. Execution-layer enforce-ить runtime allowlist рішень.
  5. Якщо revise, виконується одна ревізія з guardrails.
  6. Формується audit log змін і фінальна відповідь.

Ключовий контракт: агент не може "вільно переписувати" текст; зміни контролюються required_changes і runtime-guardrails.


Структура проєкту

TEXT
agent-patterns/
└── self-critique-agent/
    └── python/
        ├── main.py          # Draft -> Critique -> Revise -> Audit
        ├── llm.py           # draft/critique/revise LLM calls
        ├── gateway.py       # contract + guardrails + policy/execution checks
        ├── context.py       # deterministic incident context
        ├── audit.py         # diff metadata for audit log
        ├── requirements.txt
        └── README.md

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.


Задача

Уяви продакшен-кейс:

"Підготуй customer-facing оновлення про payment incident, але не змінюй факти і не давай гарантій ETA."

Проблема звичайної ревізії: модель може "покращити" текст, але випадково змінити зміст.

Рішення

У цьому прикладі:

  • critique формалізований: що не так, що треба виправити
  • revision має жорсткі межі
  • audit показує, що реально змінилося
  • high-risk кейси зупиняються через policy_escalation

Код

context.py — детермінований контекст

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260306",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "max_length_increase_pct": 20,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page every 15 minutes",
            "prepare support macro with workaround guidance",
        ],
    }

Що тут найважливіше (простими словами)

  • Всі наступні кроки базуються на фіксованих фактах.
  • Це зменшує "дрейф" між запусками і спрощує аудит.

gateway.py — critique/revision policy boundary

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    max_draft_chars: int = 900
    max_risks: int = 5
    max_required_changes: int = 5
    max_answer_chars: int = 980
    max_length_increase_pct: float = 20.0
    min_patch_similarity: float = 0.4


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]

ALLOWED_SEVERITY = {"low", "medium", "high"}


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))


def _extract_incident_ids(text: str) -> set[str]:
    return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key in sorted(value):
            item = value[key]
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in required_changes:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        # Keep extraction deterministic and explicit:
        # - REMOVE/MUST_REMOVE => must_remove
        # - ADD/MUST_INCLUDE   => must_include
        # - anything else      => ignored (must be blocked in critique validation)
        is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
            ("remove ", "remove:", "remove-")
        )
        is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
            ("add ", "add:", "add-")
        )

        if is_remove_rule:
            for phrase in quoted:
                _append_unique(must_remove, phrase)
            continue
        if is_add_rule:
            for phrase in quoted:
                _append_unique(must_include, phrase)
            continue

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _is_high_risk_risk_type(risk_type: str) -> bool:
    return risk_type in {"legal_risk", "policy_violation"}


def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
    # Compare using token-like normalization so punctuation differences
    # (e.g. trailing dots/commas) do not cause false negatives.
    normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
    normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
    normalized_text = " ".join(normalized_text.split())
    normalized_phrase = " ".join(normalized_phrase.split())
    return normalized_phrase in normalized_text


def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
    cleaned = text
    normalized_phrase = _normalize_space(phrase).strip()
    if not normalized_phrase:
        return cleaned

    variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
    for variant in variants:
        if not variant:
            continue
        cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)

    cleaned = re.sub(r"\s+\.", ".", cleaned)
    cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


def _append_phrase_sentence(*, text: str, phrase: str) -> str:
    sentence = _normalize_space(phrase).strip()
    if not sentence:
        return text

    out = text.rstrip()
    if out and out[-1] not in ".!?":
        out += "."
    separator = "\n\n" if "\n\n" in out else " "
    return (out + separator + sentence).strip()


def _is_enforceable_required_change(item: str) -> bool:
    item_norm = _normalize_space(item).lower()
    has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
        ("remove ", "remove:", "remove-", "add ", "add:", "add-")
    )
    if not has_marker:
        return False

    quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
    quoted = [value for value in quoted if value]
    return len(quoted) == 1


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    out = draft.strip()
    if len(out) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return out


def validate_critique(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_risk_types_policy: set[str],
    max_risks: int,
    max_required_changes: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_critique:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_critique:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"critique_decision_not_allowed_policy:{decision}")

    severity = raw.get("severity", "medium")
    if not isinstance(severity, str) or not severity.strip():
        raise StopRun("invalid_critique:severity")
    severity = severity.strip().lower()
    if severity not in ALLOWED_SEVERITY:
        raise StopRun("invalid_critique:severity")

    risks_raw = raw.get("risks", [])
    if not isinstance(risks_raw, list):
        raise StopRun("invalid_critique:risks")
    if len(risks_raw) > max_risks:
        raise StopRun("invalid_critique:too_many_risks")

    risks: list[dict[str, str]] = []
    for item in risks_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_critique:risk_item")

        risk_type = item.get("type")
        note = item.get("note")

        if not isinstance(risk_type, str) or not risk_type.strip():
            raise StopRun("invalid_critique:risk_type")
        risk_type = risk_type.strip()
        if risk_type not in allowed_risk_types_policy:
            raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_critique:risk_note")

        risks.append({"type": risk_type, "note": note.strip()})

    required_changes_raw = raw.get("required_changes", [])
    if not isinstance(required_changes_raw, list):
        raise StopRun("invalid_critique:required_changes")
    if len(required_changes_raw) > max_required_changes:
        raise StopRun("invalid_critique:too_many_required_changes")

    required_changes: list[str] = []
    for item in required_changes_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_critique:required_change_item")
        required_changes.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_critique:reason")
    reason = reason.strip()

    high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)

    if decision == "approve":
        if required_changes:
            raise StopRun("invalid_critique:approve_with_required_changes")
        if high_risk:
            raise StopRun("invalid_critique:approve_with_high_risk")

    if decision == "revise":
        if not required_changes:
            raise StopRun("invalid_critique:revise_without_required_changes")
        if not all(_is_enforceable_required_change(item) for item in required_changes):
            raise StopRun("invalid_critique:required_changes_not_enforceable")
        if high_risk:
            raise StopRun("invalid_critique:high_risk_requires_escalate")

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_critique:escalate_reason_required")

    return {
        "decision": decision,
        "severity": severity,
        "risks": risks,
        "required_changes": required_changes,
        "reason": reason,
        "high_risk": high_risk,
    }


class SelfCritiqueGateway:
    def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"critique_decision_denied_execution:{decision}")

    def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
        """
        Deterministic fallback for enforceable required changes:
        remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
        """
        candidate = (text or "").strip()
        if not candidate:
            return candidate

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        for phrase in must_remove:
            candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)

        for phrase in must_include:
            if not _contains_normalized_phrase(text=candidate, phrase=phrase):
                candidate = _append_phrase_sentence(text=candidate, phrase=phrase)

        return candidate.strip()

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        required_changes: list[str],
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        original_len = max(1, len(normalized_original))
        revised_len = len(normalized_revised)
        increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
        policy_hint_raw = (
            context.get("policy_hints", {}).get("max_length_increase_pct")
            if isinstance(context, dict)
            else None
        )
        policy_hint_cap = self.budget.max_length_increase_pct
        if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
            policy_hint_cap = float(policy_hint_raw)

        effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
        if increase_pct > effective_length_cap:
            raise StopRun("patch_violation:length_increase_limit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)

        if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
            raise StopRun("patch_violation:no_new_facts")

        if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
            raise StopRun("patch_violation:new_incident_id")

        if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
            raise StopRun("patch_violation:new_severity_label")

        if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
            raise StopRun("patch_violation:new_region")

        avoid_absolute_guarantees = bool(
            context.get("policy_hints", {}).get("avoid_absolute_guarantees")
            if isinstance(context, dict)
            else False
        )
        for claim_re in RESTRICTED_CLAIMS_RE:
            if avoid_absolute_guarantees:
                if claim_re.search(revised_clean):
                    raise StopRun("patch_violation:restricted_claims")
                continue
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_required_change_rules(required_changes)
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]

        if must_include or must_remove:
            revised_lower = normalized_revised.lower()
            if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")
            if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
                raise StopRun("patch_violation:required_changes_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "length_increase_pct": round(increase_pct, 2),
            "required_changes_total": len(required_changes),
            "required_changes_enforced": len(must_include) + len(must_remove),
            "required_changes_unenforced": len(required_changes)
            - (len(must_include) + len(must_remove)),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        out = answer.strip()
        if len(out) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return out

Що тут найважливіше (простими словами)

  • Critique не є "довільним текстом": він проходить сувору валідацію.
  • Revision не може вийти за межі фактичного контексту і budget-обмежень.
  • Для enforceable required_changes використовуються явні маркери ADD/REMOVE (або MUST_INCLUDE/MUST_REMOVE).
  • Якщо avoid_absolute_guarantees=True, restricted claims блокуються незалежно від чернетки (навіть якщо вони вже були в original draft).
  • required_changes_total показує загальну кількість required_changes.
  • required_changes_enforced показує, скільки required_changes стали enforce-правилами.
  • required_changes_unenforced показує, скільки required_changes не можуть бути перевірені gateway-ом.

llm.py — draft/critique/revise виклики

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "customer-facing incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "draft": "shortened customer-facing incident update"
}

Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()

CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "severity": "low|medium|high",
  "risks": [{"type":"overconfidence","note":"..."}],
  "required_changes": ["concrete change"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
  - REMOVE "phrase to delete"
  - ADD "phrase to include"
  - MUST_REMOVE "phrase to delete" (equivalent)
  - MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def shorten_draft(*, draft: str, max_chars: int) -> str:
    payload = {
        "draft": draft,
        "max_chars": int(max_chars),
    }
    data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)

    shortened = data.get("draft")
    if not isinstance(shortened, str):
        raise LLMInvalid("llm_invalid_schema")

    shortened = shortened.strip()
    if not shortened:
        raise LLMEmpty("llm_empty")
    return shortened


def critique_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_risk_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_risk_types": allowed_risk_types,
    }
    return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    required_changes: list[str],
    strict_mode: bool = False,
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "required_changes": required_changes,
    }
    system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
    data = _chat_json(system_prompt=system_prompt, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

Що тут найважливіше (простими словами)

  • Critique-вихід має стабільний JSON-контракт.
  • Prompt для critique прямо вимагає enforceable required_changes через явні ADD/REMOVE маркери.

audit.py — журнал змін

PYTHON
from __future__ import annotations

import difflib
import hashlib
import re
from typing import Any


SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")


def _hash_text(text: str) -> str:
    normalized = " ".join((text or "").split())
    return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]


def _split_for_diff(text: str) -> list[str]:
    lines = (text or "").splitlines()
    if len(lines) > 1:
        return lines

    normalized = (text or "").strip()
    if not normalized:
        return [""]

    sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
    if len(sentences) > 1:
        return sentences

    chunk_size = 80
    return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]


def build_audit_log(
    *,
    before: str,
    after: str,
    risks: list[dict[str, Any]],
    required_changes: list[str],
) -> dict[str, Any]:
    before_text = (before or "").strip()
    after_text = (after or "").strip()

    before_chars = len(before_text)
    after_chars = len(after_text)
    delta_chars = after_chars - before_chars

    if before_chars <= 0:
        increase_pct = 0.0
    else:
        increase_pct = (delta_chars / float(before_chars)) * 100.0

    raw_diff = list(
        difflib.unified_diff(
            _split_for_diff(before_text),
            _split_for_diff(after_text),
            fromfile="before",
            tofile="after",
            lineterm="",
        )
    )

    diff_excerpt: list[str] = []
    for line in raw_diff:
        if line.startswith(("---", "+++", "@@")):
            continue
        if line.startswith(("+", "-")):
            diff_excerpt.append(line)
        if len(diff_excerpt) >= 6:
            break

    return {
        "changed": before_text != after_text,
        "before_hash": _hash_text(before_text),
        "after_hash": _hash_text(after_text),
        "before_chars": before_chars,
        "after_chars": after_chars,
        "delta_chars": delta_chars,
        "length_increase_pct": round(increase_pct, 2),
        "risks_count": len(risks),
        "required_changes_count": len(required_changes),
        "diff_excerpt": diff_excerpt,
    }

Що тут найважливіше (простими словами)

  • Після ревізії зберігається audit metadata, а не тільки фінальний текст.
  • Це дає можливість дебажити "чому ця версія пройшла policy".

main.py — оркестрація повного циклу

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    max_draft_chars=900,
    max_risks=5,
    max_required_changes=5,
    max_answer_chars=980,
    max_length_increase_pct=20.0,
    min_patch_similarity=0.4,
)

ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
    ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_RISK_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = SelfCritiqueGateway(
        allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    draft_attempts = 0
    draft_retried = False
    try:
        draft_attempts += 1
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        try:
            draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
        except StopRun as exc:
            if exc.reason != "invalid_draft:too_long":
                raise
            # One bounded recovery attempt: shorten draft within policy budget.
            draft_attempts += 1
            draft_retried = True
            shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
            draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "attempts_used": draft_attempts,
            "retried": draft_retried,
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="critique")

    try:
        raw_critique = critique_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="critique")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="critique")

    try:
        critique = validate_critique(
            raw_critique,
            allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
            allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
            max_risks=BUDGET.max_risks,
            max_required_changes=BUDGET.max_required_changes,
        )
        gateway.enforce_execution_decision(critique["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="critique", raw_critique=raw_critique)

    trace.append(
        {
            "step": 2,
            "phase": "critique",
            "decision": critique["decision"],
            "severity": critique["severity"],
            "risks": len(critique["risks"]),
            "required_changes": len(critique["required_changes"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "critique_once",
            "critique": critique,
        }
    )

    if critique["decision"] == "escalate":
        escalation_reason = str(critique.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "critique",
            "critique": critique,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if critique["decision"] == "revise":
        revise_attempts = 0
        revise_retried = False
        revised_payload: dict[str, Any] | None = None
        last_revised_candidate = draft
        for attempt in range(1, 4):
            if (time.monotonic() - started) > BUDGET.max_seconds:
                return stopped("max_seconds", phase="revise")

            revise_attempts = attempt
            strict_mode = attempt > 1
            try:
                revised_raw = revise_once(
                    goal=goal,
                    incident_context=incident_context,
                    draft=draft,
                    required_changes=critique["required_changes"],
                    strict_mode=strict_mode,
                )
                last_revised_candidate = revised_raw
                revised_payload = gateway.validate_revision(
                    original=draft,
                    revised=revised_raw,
                    context=incident_context,
                    required_changes=critique["required_changes"],
                )
                break
            except LLMTimeout:
                return stopped("llm_timeout", phase="revise")
            except LLMInvalid as exc:
                return stopped(exc.args[0], phase="revise")
            except LLMEmpty:
                return stopped("llm_empty", phase="revise")
            except StopRun as exc:
                if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
                    revise_retried = True
                    continue
                if exc.reason == "patch_violation:required_changes_not_applied":
                    # Final fallback: enforce required phrase edits deterministically.
                    try:
                        fallback_revised = gateway.apply_required_changes_fallback(
                            text=last_revised_candidate,
                            required_changes=critique["required_changes"],
                        )
                        revised_payload = gateway.validate_revision(
                            original=draft,
                            revised=fallback_revised,
                            context=incident_context,
                            required_changes=critique["required_changes"],
                        )
                        revise_attempts = attempt + 1
                        revise_retried = True
                        break
                    except StopRun as fallback_exc:
                        return stopped(fallback_exc.reason, phase="revise")
                return stopped(exc.reason, phase="revise")

        if revised_payload is None:
            return stopped("patch_violation:required_changes_not_applied", phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "length_increase_pct": revised_payload["length_increase_pct"],
                "required_changes_total": revised_payload["required_changes_total"],
                "required_changes_enforced": revised_payload["required_changes_enforced"],
                "required_changes_unenforced": revised_payload["required_changes_unenforced"],
                "attempts_used": revise_attempts,
                "retried": revise_retried,
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "required_changes": critique["required_changes"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    audit_log = build_audit_log(
        before=draft,
        after=final_answer,
        risks=critique["risks"],
        required_changes=critique["required_changes"],
    )

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "audit_finalize",
            "final_hash": text_hash(final_answer),
            "changed": audit_log["changed"],
            "diff_lines": len(audit_log["diff_excerpt"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "audit_finalize",
            "status": "final",
            "changed": audit_log["changed"],
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "critique_decision": critique["decision"],
        "severity": critique["severity"],
        "risks": critique["risks"],
        "required_changes": critique["required_changes"],
        "audit": audit_log,
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • critique_decision керує подальшим flow, але execution дозволи контролює runtime.
  • Критика і ревізія залишаються прозорими через trace + history + audit.

Приклад виводу

JSON
{
  "run_id": "721e4231-7b9a-4843-99e0-888616025b35",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
  "critique_decision": "revise",
  "severity": "medium",
  "risks": [
    {
      "type": "overconfidence",
      "note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
    }
  ],
  "required_changes": [
    "MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
    "MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
    "MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
    "MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
  ],
  "audit": {
    "changed": true,
    "before_hash": "7cdaa40c3fa4",
    "after_hash": "33b356380537",
    "before_chars": 770,
    "after_chars": 827,
    "delta_chars": 57,
    "length_increase_pct": 7.4,
    "risks_count": 1,
    "required_changes_count": 4,
    "diff_excerpt": [
      "-...",
      "+..."
    ]
  },
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "9b944d7375bf",
      "chars": 770,
      "attempts_used": 2,
      "retried": true,
      "ok": true
    },
    {
      "step": 2,
      "phase": "critique",
      "decision": "revise",
      "severity": "medium",
      "risks": 1,
      "required_changes": 4,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.849,
      "length_increase_pct": 7.42,
      "required_changes_total": 4,
      "required_changes_enforced": 4,
      "required_changes_unenforced": 0,
      "attempts_used": 2,
      "retried": true,
      "revised_hash": "d92d90a2adbe",
      "ok": true
    },
    {
      "step": 4,
      "phase": "audit_finalize",
      "final_hash": "d92d90a2adbe",
      "changed": true,
      "diff_lines": 4,
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — порожня відповідь від LLM у draft/revise
  • llm_invalid_json — LLM повернув невалідний JSON
  • llm_invalid_schema — JSON не відповідає контракту
  • invalid_draft:* — чернетка не пройшла базову валідацію
  • invalid_critique:* — critique не пройшов контракт policy-layer
  • invalid_critique:required_changes_not_enforceable — для decision=revise required_changes мають бути в enforceable форматі (ADD/REMOVE/MUST_* + 1 quoted phrase)
  • critique_decision_not_allowed_policy:* — рішення critique за межами policy allowlist
  • critique_decision_denied_execution:* — runtime заборонив execution рішення
  • patch_violation:no_new_facts — ревізія додала нові факти
  • patch_violation:length_increase_limit — ревізія перевищила ліміт приросту довжини
  • patch_violation:restricted_claims — ревізія містить заборонені claims; при avoid_absolute_guarantees=true вони блокуються жорстко
  • patch_violation:required_changes_not_applied — ревізія не виконала обов’язкові зміни
  • patch_violation:too_large_edit — ревізія вийшла за межі patch-only
  • policy_escalation — critique повернув ескалацію; деталь у escalation_reason
  • max_seconds — перевищено загальний time budget run
  • invalid_answer:* — фінальна відповідь не пройшла валідацію

Що тут НЕ показано

  • persisted audit storage (DB / object storage)
  • retry/backoff та circuit breaker для LLM
  • human review queue для policy_escalation
  • доменну semantically-aware перевірку required_changes

Що спробувати далі

  1. Вимкнути AUTO_REVISION_ENABLED і перевірити critique_decision_denied_execution:revise.
  2. Додати "severity budget" (наприклад, блокувати medium+ для певних tenantів).
  3. Зберігати audit.diff_excerpt у зовнішньому log sink (S3/ELK) для інцидентного аналізу.
⏱️ 21 хв читанняОновлено 2 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.