Суть патерна (коротко)
Self-Critique Agent — це патерн, у якому агент після чернетки формує структуровану критику (risks + required_changes), виконує одну обмежену правку і записує аудит змін.
LLM пропонує чернетку та критику, а gateway-policy вирішує, чи це можна виконувати та чи ревізія не виходить за межі контракту.
Що демонструє цей приклад
- production-like flow:
Draft -> Critique -> Revise (optional) -> Audit - строго структурований critique-артефакт:
decision,severity,risks,required_changes - policy vs execution розділення для critique-рішень
- constrained revision:
no_new_facts, length increase cap, required changes enforcement - аудит змін (
before/after hash,delta_chars,diff_excerpt) - явні
stop_reason,trace,historyдля продакшен-моніторингу
Архітектура
- LLM генерує чернетку (
draft). - LLM-критик повертає структурований critique JSON.
- Gateway валідовує critique-контракт за policy.
- Execution-layer enforce-ить runtime allowlist рішень.
- Якщо
revise, виконується одна ревізія з guardrails. - Формується audit log змін і фінальна відповідь.
Ключовий контракт: агент не може "вільно переписувати" текст; зміни контролюються required_changes і runtime-guardrails.
Структура проєкту
agent-patterns/
└── self-critique-agent/
└── python/
├── main.py # Draft -> Critique -> Revise -> Audit
├── llm.py # draft/critique/revise LLM calls
├── gateway.py # contract + guardrails + policy/execution checks
├── context.py # deterministic incident context
├── audit.py # diff metadata for audit log
├── requirements.txt
└── README.md
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.
Задача
Уяви продакшен-кейс:
"Підготуй customer-facing оновлення про payment incident, але не змінюй факти і не давай гарантій ETA."
Проблема звичайної ревізії: модель може "покращити" текст, але випадково змінити зміст.
Рішення
У цьому прикладі:
- critique формалізований: що не так, що треба виправити
- revision має жорсткі межі
- audit показує, що реально змінилося
- high-risk кейси зупиняються через
policy_escalation
Код
context.py — детермінований контекст
from __future__ import annotations
from typing import Any
def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"incident": {
"incident_id": "inc_payments_20260306",
"severity": "P1",
"status": "degraded",
"affected_checkout_pct": 27,
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
},
"policy_hints": {
"avoid_absolute_guarantees": True,
"max_length_increase_pct": 20,
"required_sections": ["current_status", "customer_impact", "next_actions"],
},
"approved_actions": [
"monitor payment failures every 15 minutes",
"publish customer update via status page every 15 minutes",
"prepare support macro with workaround guidance",
],
}
Що тут найважливіше (простими словами)
- Всі наступні кроки базуються на фіксованих фактах.
- Це зменшує "дрейф" між запусками і спрощує аудит.
gateway.py — critique/revision policy boundary
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 120
max_draft_chars: int = 900
max_risks: int = 5
max_required_changes: int = 5
max_answer_chars: int = 980
max_length_increase_pct: float = 20.0
min_patch_similarity: float = 0.4
NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")
RESTRICTED_CLAIMS_RE = [
re.compile(r"\bresolved\b", re.IGNORECASE),
re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]
ALLOWED_SEVERITY = {"low", "medium", "high"}
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_space(text: str) -> str:
return " ".join((text or "").strip().split())
def text_hash(text: str) -> str:
normalized = _normalize_space(text)
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _extract_number_tokens(text: str) -> set[str]:
return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))
def _extract_incident_ids(text: str) -> set[str]:
return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))
def _extract_severity_labels(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {match.upper() for match in SEVERITY_RE.findall(normalized)}
def _extract_regions(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {value.upper() for value in REGION_RE.findall(normalized)}
def _context_claim_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (bool, int, float)):
return str(value)
if isinstance(value, list):
return " ".join(_context_claim_text(item) for item in value)
if isinstance(value, dict):
parts: list[str] = []
for key in sorted(value):
item = value[key]
parts.append(str(key))
parts.append(_context_claim_text(item))
return " ".join(parts)
return str(value)
def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
must_include: list[str] = []
must_remove: list[str] = []
def _append_unique(target: list[str], value: str) -> None:
if value and value not in target:
target.append(value)
for item in required_changes:
item_norm = _normalize_space(item).lower()
quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
if not quoted:
continue
# Keep extraction deterministic and explicit:
# - REMOVE/MUST_REMOVE => must_remove
# - ADD/MUST_INCLUDE => must_include
# - anything else => ignored (must be blocked in critique validation)
is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
("remove ", "remove:", "remove-")
)
is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
("add ", "add:", "add-")
)
if is_remove_rule:
for phrase in quoted:
_append_unique(must_remove, phrase)
continue
if is_add_rule:
for phrase in quoted:
_append_unique(must_include, phrase)
continue
return {
"must_include": must_include,
"must_remove": must_remove,
}
def _is_high_risk_risk_type(risk_type: str) -> bool:
return risk_type in {"legal_risk", "policy_violation"}
def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
# Compare using token-like normalization so punctuation differences
# (e.g. trailing dots/commas) do not cause false negatives.
normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
normalized_text = " ".join(normalized_text.split())
normalized_phrase = " ".join(normalized_phrase.split())
return normalized_phrase in normalized_text
def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
cleaned = text
normalized_phrase = _normalize_space(phrase).strip()
if not normalized_phrase:
return cleaned
variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
for variant in variants:
if not variant:
continue
cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s+\.", ".", cleaned)
cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _append_phrase_sentence(*, text: str, phrase: str) -> str:
sentence = _normalize_space(phrase).strip()
if not sentence:
return text
out = text.rstrip()
if out and out[-1] not in ".!?":
out += "."
separator = "\n\n" if "\n\n" in out else " "
return (out + separator + sentence).strip()
def _is_enforceable_required_change(item: str) -> bool:
item_norm = _normalize_space(item).lower()
has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
("remove ", "remove:", "remove-", "add ", "add:", "add-")
)
if not has_marker:
return False
quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
return len(quoted) == 1
def validate_draft(draft: Any, *, max_chars: int) -> str:
if not isinstance(draft, str) or not draft.strip():
raise StopRun("invalid_draft:empty")
out = draft.strip()
if len(out) > max_chars:
raise StopRun("invalid_draft:too_long")
return out
def validate_critique(
raw: Any,
*,
allowed_decisions_policy: set[str],
allowed_risk_types_policy: set[str],
max_risks: int,
max_required_changes: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_critique:not_object")
decision = raw.get("decision")
if not isinstance(decision, str) or not decision.strip():
raise StopRun("invalid_critique:decision")
decision = decision.strip()
if decision not in allowed_decisions_policy:
raise StopRun(f"critique_decision_not_allowed_policy:{decision}")
severity = raw.get("severity", "medium")
if not isinstance(severity, str) or not severity.strip():
raise StopRun("invalid_critique:severity")
severity = severity.strip().lower()
if severity not in ALLOWED_SEVERITY:
raise StopRun("invalid_critique:severity")
risks_raw = raw.get("risks", [])
if not isinstance(risks_raw, list):
raise StopRun("invalid_critique:risks")
if len(risks_raw) > max_risks:
raise StopRun("invalid_critique:too_many_risks")
risks: list[dict[str, str]] = []
for item in risks_raw:
if not isinstance(item, dict):
raise StopRun("invalid_critique:risk_item")
risk_type = item.get("type")
note = item.get("note")
if not isinstance(risk_type, str) or not risk_type.strip():
raise StopRun("invalid_critique:risk_type")
risk_type = risk_type.strip()
if risk_type not in allowed_risk_types_policy:
raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")
if not isinstance(note, str) or not note.strip():
raise StopRun("invalid_critique:risk_note")
risks.append({"type": risk_type, "note": note.strip()})
required_changes_raw = raw.get("required_changes", [])
if not isinstance(required_changes_raw, list):
raise StopRun("invalid_critique:required_changes")
if len(required_changes_raw) > max_required_changes:
raise StopRun("invalid_critique:too_many_required_changes")
required_changes: list[str] = []
for item in required_changes_raw:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_critique:required_change_item")
required_changes.append(item.strip())
reason = raw.get("reason", "")
if reason is None:
reason = ""
if not isinstance(reason, str):
raise StopRun("invalid_critique:reason")
reason = reason.strip()
high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)
if decision == "approve":
if required_changes:
raise StopRun("invalid_critique:approve_with_required_changes")
if high_risk:
raise StopRun("invalid_critique:approve_with_high_risk")
if decision == "revise":
if not required_changes:
raise StopRun("invalid_critique:revise_without_required_changes")
if not all(_is_enforceable_required_change(item) for item in required_changes):
raise StopRun("invalid_critique:required_changes_not_enforceable")
if high_risk:
raise StopRun("invalid_critique:high_risk_requires_escalate")
if decision == "escalate":
if not reason:
raise StopRun("invalid_critique:escalate_reason_required")
return {
"decision": decision,
"severity": severity,
"risks": risks,
"required_changes": required_changes,
"reason": reason,
"high_risk": high_risk,
}
class SelfCritiqueGateway:
def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
self.allow_execution_decisions = set(allow_execution_decisions)
self.budget = budget
def enforce_execution_decision(self, decision: str) -> None:
if decision not in self.allow_execution_decisions:
raise StopRun(f"critique_decision_denied_execution:{decision}")
def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
"""
Deterministic fallback for enforceable required changes:
remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
"""
candidate = (text or "").strip()
if not candidate:
return candidate
phrase_rules = _extract_required_change_rules(required_changes)
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
for phrase in must_remove:
candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)
for phrase in must_include:
if not _contains_normalized_phrase(text=candidate, phrase=phrase):
candidate = _append_phrase_sentence(text=candidate, phrase=phrase)
return candidate.strip()
def validate_revision(
self,
*,
original: str,
revised: str,
context: dict[str, Any],
required_changes: list[str],
) -> dict[str, Any]:
if not isinstance(revised, str) or not revised.strip():
raise StopRun("invalid_revised:empty")
revised_clean = revised.strip()
if len(revised_clean) > self.budget.max_answer_chars:
raise StopRun("invalid_revised:too_long")
normalized_original = _normalize_space(original)
normalized_revised = _normalize_space(revised_clean)
if normalized_original == normalized_revised:
raise StopRun("invalid_revised:no_changes")
similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
if similarity < self.budget.min_patch_similarity:
raise StopRun("patch_violation:too_large_edit")
original_len = max(1, len(normalized_original))
revised_len = len(normalized_revised)
increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
policy_hint_raw = (
context.get("policy_hints", {}).get("max_length_increase_pct")
if isinstance(context, dict)
else None
)
policy_hint_cap = self.budget.max_length_increase_pct
if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
policy_hint_cap = float(policy_hint_raw)
effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
if increase_pct > effective_length_cap:
raise StopRun("patch_violation:length_increase_limit")
allowed_text_tokens = _stable_json(context) + " " + original
allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
raise StopRun("patch_violation:no_new_facts")
if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
raise StopRun("patch_violation:new_incident_id")
if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
raise StopRun("patch_violation:new_severity_label")
if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
raise StopRun("patch_violation:new_region")
avoid_absolute_guarantees = bool(
context.get("policy_hints", {}).get("avoid_absolute_guarantees")
if isinstance(context, dict)
else False
)
for claim_re in RESTRICTED_CLAIMS_RE:
if avoid_absolute_guarantees:
if claim_re.search(revised_clean):
raise StopRun("patch_violation:restricted_claims")
continue
if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
raise StopRun("patch_violation:restricted_claims")
phrase_rules = _extract_required_change_rules(required_changes)
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
if must_include or must_remove:
revised_lower = normalized_revised.lower()
if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
raise StopRun("patch_violation:required_changes_not_applied")
if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
raise StopRun("patch_violation:required_changes_not_applied")
return {
"answer": revised_clean,
"patch_similarity": round(similarity, 3),
"length_increase_pct": round(increase_pct, 2),
"required_changes_total": len(required_changes),
"required_changes_enforced": len(must_include) + len(must_remove),
"required_changes_unenforced": len(required_changes)
- (len(must_include) + len(must_remove)),
}
def validate_final(self, answer: str) -> str:
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_answer:empty")
out = answer.strip()
if len(out) > self.budget.max_answer_chars:
raise StopRun("invalid_answer:too_long")
return out
Що тут найважливіше (простими словами)
- Critique не є "довільним текстом": він проходить сувору валідацію.
- Revision не може вийти за межі фактичного контексту і budget-обмежень.
- Для enforceable
required_changesвикористовуються явні маркериADD/REMOVE(абоMUST_INCLUDE/MUST_REMOVE). - Якщо
avoid_absolute_guarantees=True, restricted claims блокуються незалежно від чернетки (навіть якщо вони вже були в original draft). required_changes_totalпоказує загальну кількістьrequired_changes.required_changes_enforcedпоказує, скількиrequired_changesстали enforce-правилами.required_changes_unenforcedпоказує, скількиrequired_changesне можуть бути перевірені gateway-ом.
llm.py — draft/critique/revise виклики
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
"draft": "customer-facing incident update"
}
Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()
SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
"draft": "shortened customer-facing incident update"
}
Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()
CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
"decision": "approve|revise|escalate",
"severity": "low|medium|high",
"risks": [{"type":"overconfidence","note":"..."}],
"required_changes": ["concrete change"],
"reason": "for escalate only"
}
Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
- REMOVE "phrase to delete"
- ADD "phrase to include"
- MUST_REMOVE "phrase to delete" (equivalent)
- MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
}
data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)
draft = data.get("draft")
if not isinstance(draft, str):
raise LLMInvalid("llm_invalid_schema")
draft = draft.strip()
if not draft:
raise LLMEmpty("llm_empty")
return draft
def shorten_draft(*, draft: str, max_chars: int) -> str:
payload = {
"draft": draft,
"max_chars": int(max_chars),
}
data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)
shortened = data.get("draft")
if not isinstance(shortened, str):
raise LLMInvalid("llm_invalid_schema")
shortened = shortened.strip()
if not shortened:
raise LLMEmpty("llm_empty")
return shortened
def critique_draft(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
allowed_risk_types: list[str],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"allowed_risk_types": allowed_risk_types,
}
return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)
def revise_once(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
required_changes: list[str],
strict_mode: bool = False,
) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"required_changes": required_changes,
}
system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
data = _chat_json(system_prompt=system_prompt, payload=payload)
revised = data.get("revised_answer")
if not isinstance(revised, str):
raise LLMInvalid("llm_invalid_schema")
revised = revised.strip()
if not revised:
raise LLMEmpty("llm_empty")
return revised
Що тут найважливіше (простими словами)
- Critique-вихід має стабільний JSON-контракт.
- Prompt для critique прямо вимагає enforceable
required_changesчерез явніADD/REMOVEмаркери.
audit.py — журнал змін
from __future__ import annotations
import difflib
import hashlib
import re
from typing import Any
SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
def _hash_text(text: str) -> str:
normalized = " ".join((text or "").split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]
def _split_for_diff(text: str) -> list[str]:
lines = (text or "").splitlines()
if len(lines) > 1:
return lines
normalized = (text or "").strip()
if not normalized:
return [""]
sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
if len(sentences) > 1:
return sentences
chunk_size = 80
return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]
def build_audit_log(
*,
before: str,
after: str,
risks: list[dict[str, Any]],
required_changes: list[str],
) -> dict[str, Any]:
before_text = (before or "").strip()
after_text = (after or "").strip()
before_chars = len(before_text)
after_chars = len(after_text)
delta_chars = after_chars - before_chars
if before_chars <= 0:
increase_pct = 0.0
else:
increase_pct = (delta_chars / float(before_chars)) * 100.0
raw_diff = list(
difflib.unified_diff(
_split_for_diff(before_text),
_split_for_diff(after_text),
fromfile="before",
tofile="after",
lineterm="",
)
)
diff_excerpt: list[str] = []
for line in raw_diff:
if line.startswith(("---", "+++", "@@")):
continue
if line.startswith(("+", "-")):
diff_excerpt.append(line)
if len(diff_excerpt) >= 6:
break
return {
"changed": before_text != after_text,
"before_hash": _hash_text(before_text),
"after_hash": _hash_text(after_text),
"before_chars": before_chars,
"after_chars": after_chars,
"delta_chars": delta_chars,
"length_increase_pct": round(increase_pct, 2),
"risks_count": len(risks),
"required_changes_count": len(required_changes),
"diff_excerpt": diff_excerpt,
}
Що тут найважливіше (простими словами)
- Після ревізії зберігається audit metadata, а не тільки фінальний текст.
- Це дає можливість дебажити "чому ця версія пройшла policy".
main.py — оркестрація повного циклу
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft
GOAL = (
"Draft a customer-facing payment incident update for US enterprise customers. "
"Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")
BUDGET = Budget(
max_seconds=120,
max_draft_chars=900,
max_risks=5,
max_required_changes=5,
max_answer_chars=980,
max_length_increase_pct=20.0,
min_patch_similarity=0.4,
)
ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)
ALLOWED_RISK_TYPES_POLICY = {
"overconfidence",
"missing_uncertainty",
"contradiction",
"scope_leak",
"policy_violation",
"legal_risk",
}
def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = SelfCritiqueGateway(
allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
draft_attempts = 0
draft_retried = False
try:
draft_attempts += 1
draft_raw = generate_draft(goal=goal, incident_context=incident_context)
try:
draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
except StopRun as exc:
if exc.reason != "invalid_draft:too_long":
raise
# One bounded recovery attempt: shorten draft within policy budget.
draft_attempts += 1
draft_retried = True
shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
except LLMTimeout:
return stopped("llm_timeout", phase="draft")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="draft")
except LLMEmpty:
return stopped("llm_empty", phase="draft")
except StopRun as exc:
return stopped(exc.reason, phase="draft")
trace.append(
{
"step": 1,
"phase": "draft",
"draft_hash": text_hash(draft),
"chars": len(draft),
"attempts_used": draft_attempts,
"retried": draft_retried,
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "draft_once",
"draft": draft,
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="critique")
try:
raw_critique = critique_draft(
goal=goal,
incident_context=incident_context,
draft=draft,
allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="critique")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="critique")
try:
critique = validate_critique(
raw_critique,
allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
max_risks=BUDGET.max_risks,
max_required_changes=BUDGET.max_required_changes,
)
gateway.enforce_execution_decision(critique["decision"])
except StopRun as exc:
return stopped(exc.reason, phase="critique", raw_critique=raw_critique)
trace.append(
{
"step": 2,
"phase": "critique",
"decision": critique["decision"],
"severity": critique["severity"],
"risks": len(critique["risks"]),
"required_changes": len(critique["required_changes"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"action": "critique_once",
"critique": critique,
}
)
if critique["decision"] == "escalate":
escalation_reason = str(critique.get("reason", "")).strip()
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "policy_escalation",
"escalation_reason": escalation_reason[:120],
"phase": "critique",
"critique": critique,
"trace": trace,
"history": history,
}
final_answer = draft
revised = False
if critique["decision"] == "revise":
revise_attempts = 0
revise_retried = False
revised_payload: dict[str, Any] | None = None
last_revised_candidate = draft
for attempt in range(1, 4):
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="revise")
revise_attempts = attempt
strict_mode = attempt > 1
try:
revised_raw = revise_once(
goal=goal,
incident_context=incident_context,
draft=draft,
required_changes=critique["required_changes"],
strict_mode=strict_mode,
)
last_revised_candidate = revised_raw
revised_payload = gateway.validate_revision(
original=draft,
revised=revised_raw,
context=incident_context,
required_changes=critique["required_changes"],
)
break
except LLMTimeout:
return stopped("llm_timeout", phase="revise")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="revise")
except LLMEmpty:
return stopped("llm_empty", phase="revise")
except StopRun as exc:
if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
revise_retried = True
continue
if exc.reason == "patch_violation:required_changes_not_applied":
# Final fallback: enforce required phrase edits deterministically.
try:
fallback_revised = gateway.apply_required_changes_fallback(
text=last_revised_candidate,
required_changes=critique["required_changes"],
)
revised_payload = gateway.validate_revision(
original=draft,
revised=fallback_revised,
context=incident_context,
required_changes=critique["required_changes"],
)
revise_attempts = attempt + 1
revise_retried = True
break
except StopRun as fallback_exc:
return stopped(fallback_exc.reason, phase="revise")
return stopped(exc.reason, phase="revise")
if revised_payload is None:
return stopped("patch_violation:required_changes_not_applied", phase="revise")
final_answer = revised_payload["answer"]
revised = True
trace.append(
{
"step": 3,
"phase": "revise",
"patch_similarity": revised_payload["patch_similarity"],
"length_increase_pct": revised_payload["length_increase_pct"],
"required_changes_total": revised_payload["required_changes_total"],
"required_changes_enforced": revised_payload["required_changes_enforced"],
"required_changes_unenforced": revised_payload["required_changes_unenforced"],
"attempts_used": revise_attempts,
"retried": revise_retried,
"revised_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "revise_once",
"required_changes": critique["required_changes"],
"revised_answer": final_answer,
}
)
try:
final_answer = gateway.validate_final(final_answer)
except StopRun as exc:
return stopped(exc.reason, phase="finalize")
audit_log = build_audit_log(
before=draft,
after=final_answer,
risks=critique["risks"],
required_changes=critique["required_changes"],
)
trace.append(
{
"step": 4 if revised else 3,
"phase": "audit_finalize",
"final_hash": text_hash(final_answer),
"changed": audit_log["changed"],
"diff_lines": len(audit_log["diff_excerpt"]),
"ok": True,
}
)
history.append(
{
"step": 4 if revised else 3,
"action": "audit_finalize",
"status": "final",
"changed": audit_log["changed"],
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once" if revised else "approved_direct",
"answer": final_answer,
"critique_decision": critique["decision"],
"severity": critique["severity"],
"risks": critique["risks"],
"required_changes": critique["required_changes"],
"audit": audit_log,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
critique_decisionкерує подальшим flow, але execution дозволи контролює runtime.- Критика і ревізія залишаються прозорими через
trace + history + audit.
Приклад виводу
{
"run_id": "721e4231-7b9a-4843-99e0-888616025b35",
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once",
"answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
"critique_decision": "revise",
"severity": "medium",
"risks": [
{
"type": "overconfidence",
"note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
}
],
"required_changes": [
"MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
"MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
"MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
"MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
],
"audit": {
"changed": true,
"before_hash": "7cdaa40c3fa4",
"after_hash": "33b356380537",
"before_chars": 770,
"after_chars": 827,
"delta_chars": 57,
"length_increase_pct": 7.4,
"risks_count": 1,
"required_changes_count": 4,
"diff_excerpt": [
"-...",
"+..."
]
},
"trace": [
{
"step": 1,
"phase": "draft",
"draft_hash": "9b944d7375bf",
"chars": 770,
"attempts_used": 2,
"retried": true,
"ok": true
},
{
"step": 2,
"phase": "critique",
"decision": "revise",
"severity": "medium",
"risks": 1,
"required_changes": 4,
"ok": true
},
{
"step": 3,
"phase": "revise",
"patch_similarity": 0.849,
"length_increase_pct": 7.42,
"required_changes_total": 4,
"required_changes_enforced": 4,
"required_changes_unenforced": 0,
"attempts_used": 2,
"retried": true,
"revised_hash": "d92d90a2adbe",
"ok": true
},
{
"step": 4,
"phase": "audit_finalize",
"final_hash": "d92d90a2adbe",
"changed": true,
"diff_lines": 4,
"ok": true
}
],
"history": [{...}]
}
Типові stop_reason
success— run завершено коректноllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_empty— порожня відповідь від LLM у draft/revisellm_invalid_json— LLM повернув невалідний JSONllm_invalid_schema— JSON не відповідає контрактуinvalid_draft:*— чернетка не пройшла базову валідаціюinvalid_critique:*— critique не пройшов контракт policy-layerinvalid_critique:required_changes_not_enforceable— дляdecision=reviserequired_changes мають бути в enforceable форматі (ADD/REMOVE/MUST_*+ 1 quoted phrase)critique_decision_not_allowed_policy:*— рішення critique за межами policy allowlistcritique_decision_denied_execution:*— runtime заборонив execution рішенняpatch_violation:no_new_facts— ревізія додала нові фактиpatch_violation:length_increase_limit— ревізія перевищила ліміт приросту довжиниpatch_violation:restricted_claims— ревізія містить заборонені claims; приavoid_absolute_guarantees=trueвони блокуються жорсткоpatch_violation:required_changes_not_applied— ревізія не виконала обов’язкові зміниpatch_violation:too_large_edit— ревізія вийшла за межі patch-onlypolicy_escalation— critique повернув ескалацію; деталь уescalation_reasonmax_seconds— перевищено загальний time budget runinvalid_answer:*— фінальна відповідь не пройшла валідацію
Що тут НЕ показано
- persisted audit storage (DB / object storage)
- retry/backoff та circuit breaker для LLM
- human review queue для
policy_escalation - доменну semantically-aware перевірку
required_changes
Що спробувати далі
- Вимкнути
AUTO_REVISION_ENABLEDі перевіритиcritique_decision_denied_execution:revise. - Додати "severity budget" (наприклад, блокувати
medium+для певних tenantів). - Зберігати
audit.diff_excerptу зовнішньому log sink (S3/ELK) для інцидентного аналізу.