Pattern Essence (Brief)
Self-Critique Agent is a pattern where after a draft answer the agent produces a structured critique (risks + required_changes), performs one bounded revision, and records an audit of changes.
LLM proposes the draft and critique, while gateway policy decides whether this can be executed and whether the revision stays within contract boundaries.
What this example demonstrates
- production-like flow:
Draft -> Critique -> Revise (optional) -> Audit - strictly structured critique artifact:
decision,severity,risks,required_changes - policy vs execution separation for critique decisions
- constrained revision:
no_new_facts, length increase cap, required changes enforcement - change audit (
before/after hash,delta_chars,diff_excerpt) - explicit
stop_reason,trace,historyfor production monitoring
Architecture
- LLM generates a draft (
draft). - LLM critic returns structured critique JSON.
- Gateway validates critique contract by policy.
- Execution layer enforces runtime decision allowlist.
- If
revise, one revision with guardrails is executed. - Change audit log and final answer are produced.
Key contract: the agent cannot "freely rewrite" text; changes are controlled by required_changes and runtime guardrails.
Project structure
agent-patterns/
└── self-critique-agent/
└── python/
├── main.py # Draft -> Critique -> Revise -> Audit
├── llm.py # draft/critique/revise LLM calls
├── gateway.py # contract + guardrails + policy/execution checks
├── context.py # deterministic incident context
├── audit.py # diff metadata for audit log
├── requirements.txt
└── README.md
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/self-critique-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv.
Task
Imagine a production case:
"Prepare a customer-facing update about a payment incident, but do not change facts and do not provide ETA guarantees."
Problem with ordinary revision: the model can "improve" wording but accidentally change meaning.
Solution
In this example:
- critique is formalized: what is wrong and what must be fixed
- revision has strict boundaries
- audit shows what actually changed
- high-risk cases are stopped via
policy_escalation
Code
context.py — deterministic context
from __future__ import annotations
from typing import Any
def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"incident": {
"incident_id": "inc_payments_20260306",
"severity": "P1",
"status": "degraded",
"affected_checkout_pct": 27,
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
},
"policy_hints": {
"avoid_absolute_guarantees": True,
"max_length_increase_pct": 20,
"required_sections": ["current_status", "customer_impact", "next_actions"],
},
"approved_actions": [
"monitor payment failures every 15 minutes",
"publish customer update via status page every 15 minutes",
"prepare support macro with workaround guidance",
],
}
What matters most here (in plain words)
- All subsequent steps are based on fixed facts.
- This reduces run-to-run "drift" and simplifies audit.
gateway.py — critique/revision policy boundary
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 120
max_draft_chars: int = 900
max_risks: int = 5
max_required_changes: int = 5
max_answer_chars: int = 980
max_length_increase_pct: float = 20.0
min_patch_similarity: float = 0.4
NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,160})['\"]")
RESTRICTED_CLAIMS_RE = [
re.compile(r"\bresolved\b", re.IGNORECASE),
re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]
ALLOWED_SEVERITY = {"low", "medium", "high"}
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_space(text: str) -> str:
return " ".join((text or "").strip().split())
def text_hash(text: str) -> str:
normalized = _normalize_space(text)
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _extract_number_tokens(text: str) -> set[str]:
return set(NUMBER_TOKEN_RE.findall(_normalize_space(text).lower()))
def _extract_incident_ids(text: str) -> set[str]:
return set(INCIDENT_ID_RE.findall(_normalize_space(text).lower()))
def _extract_severity_labels(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {match.upper() for match in SEVERITY_RE.findall(normalized)}
def _extract_regions(text: str) -> set[str]:
normalized = _normalize_space(text).upper()
return {value.upper() for value in REGION_RE.findall(normalized)}
def _context_claim_text(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (bool, int, float)):
return str(value)
if isinstance(value, list):
return " ".join(_context_claim_text(item) for item in value)
if isinstance(value, dict):
parts: list[str] = []
for key in sorted(value):
item = value[key]
parts.append(str(key))
parts.append(_context_claim_text(item))
return " ".join(parts)
return str(value)
def _extract_required_change_rules(required_changes: list[str]) -> dict[str, list[str]]:
must_include: list[str] = []
must_remove: list[str] = []
def _append_unique(target: list[str], value: str) -> None:
if value and value not in target:
target.append(value)
for item in required_changes:
item_norm = _normalize_space(item).lower()
quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
if not quoted:
continue
# Keep extraction deterministic and explicit:
# - REMOVE/MUST_REMOVE => must_remove
# - ADD/MUST_INCLUDE => must_include
# - anything else => ignored (must be blocked in critique validation)
is_remove_rule = ("must_remove" in item_norm) or item_norm.startswith(
("remove ", "remove:", "remove-")
)
is_add_rule = ("must_include" in item_norm) or item_norm.startswith(
("add ", "add:", "add-")
)
if is_remove_rule:
for phrase in quoted:
_append_unique(must_remove, phrase)
continue
if is_add_rule:
for phrase in quoted:
_append_unique(must_include, phrase)
continue
return {
"must_include": must_include,
"must_remove": must_remove,
}
def _is_high_risk_risk_type(risk_type: str) -> bool:
return risk_type in {"legal_risk", "policy_violation"}
def _contains_normalized_phrase(*, text: str, phrase: str) -> bool:
# Compare using token-like normalization so punctuation differences
# (e.g. trailing dots/commas) do not cause false negatives.
normalized_text = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(text).lower())
normalized_phrase = re.sub(r"[^a-z0-9% ]+", " ", _normalize_space(phrase).lower())
normalized_text = " ".join(normalized_text.split())
normalized_phrase = " ".join(normalized_phrase.split())
return normalized_phrase in normalized_text
def _remove_phrase_occurrences(*, text: str, phrase: str) -> str:
cleaned = text
normalized_phrase = _normalize_space(phrase).strip()
if not normalized_phrase:
return cleaned
variants = {normalized_phrase, normalized_phrase.rstrip(".!?")}
for variant in variants:
if not variant:
continue
cleaned = re.sub(re.escape(variant), "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s+\.", ".", cleaned)
cleaned = re.sub(r"[ \t]{2,}", " ", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _append_phrase_sentence(*, text: str, phrase: str) -> str:
sentence = _normalize_space(phrase).strip()
if not sentence:
return text
out = text.rstrip()
if out and out[-1] not in ".!?":
out += "."
separator = "\n\n" if "\n\n" in out else " "
return (out + separator + sentence).strip()
def _is_enforceable_required_change(item: str) -> bool:
item_norm = _normalize_space(item).lower()
has_marker = ("must_remove" in item_norm) or ("must_include" in item_norm) or item_norm.startswith(
("remove ", "remove:", "remove-", "add ", "add:", "add-")
)
if not has_marker:
return False
quoted = [_normalize_space(match) for match in QUOTED_PHRASE_RE.findall(item)]
quoted = [value for value in quoted if value]
return len(quoted) == 1
def validate_draft(draft: Any, *, max_chars: int) -> str:
if not isinstance(draft, str) or not draft.strip():
raise StopRun("invalid_draft:empty")
out = draft.strip()
if len(out) > max_chars:
raise StopRun("invalid_draft:too_long")
return out
def validate_critique(
raw: Any,
*,
allowed_decisions_policy: set[str],
allowed_risk_types_policy: set[str],
max_risks: int,
max_required_changes: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_critique:not_object")
decision = raw.get("decision")
if not isinstance(decision, str) or not decision.strip():
raise StopRun("invalid_critique:decision")
decision = decision.strip()
if decision not in allowed_decisions_policy:
raise StopRun(f"critique_decision_not_allowed_policy:{decision}")
severity = raw.get("severity", "medium")
if not isinstance(severity, str) or not severity.strip():
raise StopRun("invalid_critique:severity")
severity = severity.strip().lower()
if severity not in ALLOWED_SEVERITY:
raise StopRun("invalid_critique:severity")
risks_raw = raw.get("risks", [])
if not isinstance(risks_raw, list):
raise StopRun("invalid_critique:risks")
if len(risks_raw) > max_risks:
raise StopRun("invalid_critique:too_many_risks")
risks: list[dict[str, str]] = []
for item in risks_raw:
if not isinstance(item, dict):
raise StopRun("invalid_critique:risk_item")
risk_type = item.get("type")
note = item.get("note")
if not isinstance(risk_type, str) or not risk_type.strip():
raise StopRun("invalid_critique:risk_type")
risk_type = risk_type.strip()
if risk_type not in allowed_risk_types_policy:
raise StopRun(f"critique_risk_not_allowed_policy:{risk_type}")
if not isinstance(note, str) or not note.strip():
raise StopRun("invalid_critique:risk_note")
risks.append({"type": risk_type, "note": note.strip()})
required_changes_raw = raw.get("required_changes", [])
if not isinstance(required_changes_raw, list):
raise StopRun("invalid_critique:required_changes")
if len(required_changes_raw) > max_required_changes:
raise StopRun("invalid_critique:too_many_required_changes")
required_changes: list[str] = []
for item in required_changes_raw:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_critique:required_change_item")
required_changes.append(item.strip())
reason = raw.get("reason", "")
if reason is None:
reason = ""
if not isinstance(reason, str):
raise StopRun("invalid_critique:reason")
reason = reason.strip()
high_risk = severity == "high" or any(_is_high_risk_risk_type(r["type"]) for r in risks)
if decision == "approve":
if required_changes:
raise StopRun("invalid_critique:approve_with_required_changes")
if high_risk:
raise StopRun("invalid_critique:approve_with_high_risk")
if decision == "revise":
if not required_changes:
raise StopRun("invalid_critique:revise_without_required_changes")
if not all(_is_enforceable_required_change(item) for item in required_changes):
raise StopRun("invalid_critique:required_changes_not_enforceable")
if high_risk:
raise StopRun("invalid_critique:high_risk_requires_escalate")
if decision == "escalate":
if not reason:
raise StopRun("invalid_critique:escalate_reason_required")
return {
"decision": decision,
"severity": severity,
"risks": risks,
"required_changes": required_changes,
"reason": reason,
"high_risk": high_risk,
}
class SelfCritiqueGateway:
def __init__(self, *, allow_execution_decisions: set[str], budget: Budget):
self.allow_execution_decisions = set(allow_execution_decisions)
self.budget = budget
def enforce_execution_decision(self, decision: str) -> None:
if decision not in self.allow_execution_decisions:
raise StopRun(f"critique_decision_denied_execution:{decision}")
def apply_required_changes_fallback(self, *, text: str, required_changes: list[str]) -> str:
"""
Deterministic fallback for enforceable required changes:
remove MUST_REMOVE/REMOVE phrases and append missing MUST_INCLUDE/ADD phrases.
"""
candidate = (text or "").strip()
if not candidate:
return candidate
phrase_rules = _extract_required_change_rules(required_changes)
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
for phrase in must_remove:
candidate = _remove_phrase_occurrences(text=candidate, phrase=phrase)
for phrase in must_include:
if not _contains_normalized_phrase(text=candidate, phrase=phrase):
candidate = _append_phrase_sentence(text=candidate, phrase=phrase)
return candidate.strip()
def validate_revision(
self,
*,
original: str,
revised: str,
context: dict[str, Any],
required_changes: list[str],
) -> dict[str, Any]:
if not isinstance(revised, str) or not revised.strip():
raise StopRun("invalid_revised:empty")
revised_clean = revised.strip()
if len(revised_clean) > self.budget.max_answer_chars:
raise StopRun("invalid_revised:too_long")
normalized_original = _normalize_space(original)
normalized_revised = _normalize_space(revised_clean)
if normalized_original == normalized_revised:
raise StopRun("invalid_revised:no_changes")
similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
if similarity < self.budget.min_patch_similarity:
raise StopRun("patch_violation:too_large_edit")
original_len = max(1, len(normalized_original))
revised_len = len(normalized_revised)
increase_pct = ((revised_len - original_len) / float(original_len)) * 100.0
policy_hint_raw = (
context.get("policy_hints", {}).get("max_length_increase_pct")
if isinstance(context, dict)
else None
)
policy_hint_cap = self.budget.max_length_increase_pct
if isinstance(policy_hint_raw, (int, float)) and not isinstance(policy_hint_raw, bool):
policy_hint_cap = float(policy_hint_raw)
effective_length_cap = min(self.budget.max_length_increase_pct, policy_hint_cap)
if increase_pct > effective_length_cap:
raise StopRun("patch_violation:length_increase_limit")
allowed_text_tokens = _stable_json(context) + " " + original
allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
if _extract_number_tokens(revised_clean) - _extract_number_tokens(allowed_text_tokens):
raise StopRun("patch_violation:no_new_facts")
if _extract_incident_ids(revised_clean) - _extract_incident_ids(allowed_text_tokens):
raise StopRun("patch_violation:new_incident_id")
if _extract_severity_labels(revised_clean) - _extract_severity_labels(allowed_text_tokens):
raise StopRun("patch_violation:new_severity_label")
if _extract_regions(revised_clean) - _extract_regions(allowed_text_tokens):
raise StopRun("patch_violation:new_region")
avoid_absolute_guarantees = bool(
context.get("policy_hints", {}).get("avoid_absolute_guarantees")
if isinstance(context, dict)
else False
)
for claim_re in RESTRICTED_CLAIMS_RE:
if avoid_absolute_guarantees:
if claim_re.search(revised_clean):
raise StopRun("patch_violation:restricted_claims")
continue
if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
raise StopRun("patch_violation:restricted_claims")
phrase_rules = _extract_required_change_rules(required_changes)
must_include = phrase_rules["must_include"]
must_remove = phrase_rules["must_remove"]
if must_include or must_remove:
revised_lower = normalized_revised.lower()
if [value for value in must_include if not _contains_normalized_phrase(text=revised_lower, phrase=value)]:
raise StopRun("patch_violation:required_changes_not_applied")
if [value for value in must_remove if _contains_normalized_phrase(text=revised_lower, phrase=value)]:
raise StopRun("patch_violation:required_changes_not_applied")
return {
"answer": revised_clean,
"patch_similarity": round(similarity, 3),
"length_increase_pct": round(increase_pct, 2),
"required_changes_total": len(required_changes),
"required_changes_enforced": len(must_include) + len(must_remove),
"required_changes_unenforced": len(required_changes)
- (len(must_include) + len(must_remove)),
}
def validate_final(self, answer: str) -> str:
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_answer:empty")
out = answer.strip()
if len(out) > self.budget.max_answer_chars:
raise StopRun("invalid_answer:too_long")
return out
What matters most here (in plain words)
- Critique is not "arbitrary text": it passes strict validation.
- Revision cannot go beyond factual context and budget limits.
- Enforceable
required_changesuse explicitADD/REMOVEmarkers (orMUST_INCLUDE/MUST_REMOVE). - If
avoid_absolute_guarantees=True, restricted claims are blocked regardless of draft content (even if they were already in the original draft). required_changes_totalshows the total number ofrequired_changes.required_changes_enforcedshows how manyrequired_changesbecame enforce rules.required_changes_unenforcedshows how manyrequired_changescannot be checked by gateway.
llm.py — draft/critique/revise calls
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
"draft": "customer-facing incident update"
}
Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()
SHORTEN_DRAFT_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
"draft": "shortened customer-facing incident update"
}
Rules:
- Rewrite the draft to be <= max_chars characters.
- Preserve original facts, numbers, and intent.
- Do not add new facts or speculative claims.
- Keep current status, customer impact, and next actions.
- Keep language clear and non-speculative.
- Avoid absolute guarantees.
- Do not output markdown or extra keys.
""".strip()
CRITIQUE_SYSTEM_PROMPT = """
You are a strict critique reviewer.
Return exactly one JSON object:
{
"decision": "approve|revise|escalate",
"severity": "low|medium|high",
"risks": [{"type":"overconfidence","note":"..."}],
"required_changes": ["concrete change"],
"reason": "for escalate only"
}
Rules:
- Review exactly once.
- decision=approve: required_changes must be empty.
- decision=revise: provide 1-5 concrete required changes.
- decision=escalate: use only for high-risk content.
- Every required_changes item MUST start with ADD/REMOVE/MUST_INCLUDE/MUST_REMOVE.
- Every required_changes item MUST contain exactly one quoted phrase.
- If you cannot express required changes in enforceable ADD/REMOVE format, set decision=escalate and explain why in reason.
- Use explicit markers for enforceable phrase edits:
- REMOVE "phrase to delete"
- ADD "phrase to include"
- MUST_REMOVE "phrase to delete" (equivalent)
- MUST_INCLUDE "phrase to include" (equivalent)
- Do not add new facts in required_changes.
- Use only risk types from allowed_risk_types.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- Do not output markdown or extra keys.
""".strip()
REVISE_SYSTEM_PROMPT_STRICT = """
You are an editor applying a constrained rewrite.
Return exactly one JSON object:
{
"revised_answer": "updated answer"
}
Rules:
- Apply required_changes only.
- Keep original scope and customer intent.
- Do not add new facts or numbers.
- Keep the answer concise and actionable.
- You MUST satisfy each required_changes item exactly.
- For ADD/MUST_INCLUDE: include the quoted phrase verbatim.
- For REMOVE/MUST_REMOVE: ensure the quoted phrase does not appear.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
}
data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)
draft = data.get("draft")
if not isinstance(draft, str):
raise LLMInvalid("llm_invalid_schema")
draft = draft.strip()
if not draft:
raise LLMEmpty("llm_empty")
return draft
def shorten_draft(*, draft: str, max_chars: int) -> str:
payload = {
"draft": draft,
"max_chars": int(max_chars),
}
data = _chat_json(system_prompt=SHORTEN_DRAFT_SYSTEM_PROMPT, payload=payload)
shortened = data.get("draft")
if not isinstance(shortened, str):
raise LLMInvalid("llm_invalid_schema")
shortened = shortened.strip()
if not shortened:
raise LLMEmpty("llm_empty")
return shortened
def critique_draft(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
allowed_risk_types: list[str],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"allowed_risk_types": allowed_risk_types,
}
return _chat_json(system_prompt=CRITIQUE_SYSTEM_PROMPT, payload=payload)
def revise_once(
*,
goal: str,
incident_context: dict[str, Any],
draft: str,
required_changes: list[str],
strict_mode: bool = False,
) -> str:
payload = {
"goal": goal,
"incident_context": incident_context,
"draft": draft,
"required_changes": required_changes,
}
system_prompt = REVISE_SYSTEM_PROMPT_STRICT if strict_mode else REVISE_SYSTEM_PROMPT
data = _chat_json(system_prompt=system_prompt, payload=payload)
revised = data.get("revised_answer")
if not isinstance(revised, str):
raise LLMInvalid("llm_invalid_schema")
revised = revised.strip()
if not revised:
raise LLMEmpty("llm_empty")
return revised
What matters most here (in plain words)
- Critique output has a stable JSON contract.
- Critique prompt explicitly requires enforceable
required_changesvia explicitADD/REMOVEmarkers.
audit.py — change journal
from __future__ import annotations
import difflib
import hashlib
import re
from typing import Any
SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
def _hash_text(text: str) -> str:
normalized = " ".join((text or "").split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:12]
def _split_for_diff(text: str) -> list[str]:
lines = (text or "").splitlines()
if len(lines) > 1:
return lines
normalized = (text or "").strip()
if not normalized:
return [""]
sentences = [item.strip() for item in SENTENCE_SPLIT_RE.split(normalized) if item.strip()]
if len(sentences) > 1:
return sentences
chunk_size = 80
return [normalized[i : i + chunk_size] for i in range(0, len(normalized), chunk_size)]
def build_audit_log(
*,
before: str,
after: str,
risks: list[dict[str, Any]],
required_changes: list[str],
) -> dict[str, Any]:
before_text = (before or "").strip()
after_text = (after or "").strip()
before_chars = len(before_text)
after_chars = len(after_text)
delta_chars = after_chars - before_chars
if before_chars <= 0:
increase_pct = 0.0
else:
increase_pct = (delta_chars / float(before_chars)) * 100.0
raw_diff = list(
difflib.unified_diff(
_split_for_diff(before_text),
_split_for_diff(after_text),
fromfile="before",
tofile="after",
lineterm="",
)
)
diff_excerpt: list[str] = []
for line in raw_diff:
if line.startswith(("---", "+++", "@@")):
continue
if line.startswith(("+", "-")):
diff_excerpt.append(line)
if len(diff_excerpt) >= 6:
break
return {
"changed": before_text != after_text,
"before_hash": _hash_text(before_text),
"after_hash": _hash_text(after_text),
"before_chars": before_chars,
"after_chars": after_chars,
"delta_chars": delta_chars,
"length_increase_pct": round(increase_pct, 2),
"risks_count": len(risks),
"required_changes_count": len(required_changes),
"diff_excerpt": diff_excerpt,
}
What matters most here (in plain words)
- After revision, audit metadata is stored, not just final text.
- This enables debugging of "why this version passed policy".
main.py — full-cycle orchestration
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from audit import build_audit_log
from context import build_incident_context
from gateway import Budget, SelfCritiqueGateway, StopRun, text_hash, validate_critique, validate_draft
from llm import LLMEmpty, LLMInvalid, LLMTimeout, critique_draft, generate_draft, revise_once, shorten_draft
GOAL = (
"Draft a customer-facing payment incident update for US enterprise customers. "
"Use precise language, avoid guarantees, and keep next actions concrete."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-06", region="US")
BUDGET = Budget(
max_seconds=120,
max_draft_chars=900,
max_risks=5,
max_required_changes=5,
max_answer_chars=980,
max_length_increase_pct=20.0,
min_patch_similarity=0.4,
)
ALLOWED_CRITIQUE_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_CRITIQUE_DECISIONS_EXECUTION = (
ALLOWED_CRITIQUE_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)
ALLOWED_RISK_TYPES_POLICY = {
"overconfidence",
"missing_uncertainty",
"contradiction",
"scope_leak",
"policy_violation",
"legal_risk",
}
def run_self_critique_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = SelfCritiqueGateway(
allow_execution_decisions=ALLOWED_CRITIQUE_DECISIONS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
draft_attempts = 0
draft_retried = False
try:
draft_attempts += 1
draft_raw = generate_draft(goal=goal, incident_context=incident_context)
try:
draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
except StopRun as exc:
if exc.reason != "invalid_draft:too_long":
raise
# One bounded recovery attempt: shorten draft within policy budget.
draft_attempts += 1
draft_retried = True
shortened_raw = shorten_draft(draft=draft_raw, max_chars=BUDGET.max_draft_chars)
draft = validate_draft(shortened_raw, max_chars=BUDGET.max_draft_chars)
except LLMTimeout:
return stopped("llm_timeout", phase="draft")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="draft")
except LLMEmpty:
return stopped("llm_empty", phase="draft")
except StopRun as exc:
return stopped(exc.reason, phase="draft")
trace.append(
{
"step": 1,
"phase": "draft",
"draft_hash": text_hash(draft),
"chars": len(draft),
"attempts_used": draft_attempts,
"retried": draft_retried,
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "draft_once",
"draft": draft,
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="critique")
try:
raw_critique = critique_draft(
goal=goal,
incident_context=incident_context,
draft=draft,
allowed_risk_types=sorted(ALLOWED_RISK_TYPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="critique")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="critique")
try:
critique = validate_critique(
raw_critique,
allowed_decisions_policy=ALLOWED_CRITIQUE_DECISIONS_POLICY,
allowed_risk_types_policy=ALLOWED_RISK_TYPES_POLICY,
max_risks=BUDGET.max_risks,
max_required_changes=BUDGET.max_required_changes,
)
gateway.enforce_execution_decision(critique["decision"])
except StopRun as exc:
return stopped(exc.reason, phase="critique", raw_critique=raw_critique)
trace.append(
{
"step": 2,
"phase": "critique",
"decision": critique["decision"],
"severity": critique["severity"],
"risks": len(critique["risks"]),
"required_changes": len(critique["required_changes"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"action": "critique_once",
"critique": critique,
}
)
if critique["decision"] == "escalate":
escalation_reason = str(critique.get("reason", "")).strip()
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "policy_escalation",
"escalation_reason": escalation_reason[:120],
"phase": "critique",
"critique": critique,
"trace": trace,
"history": history,
}
final_answer = draft
revised = False
if critique["decision"] == "revise":
revise_attempts = 0
revise_retried = False
revised_payload: dict[str, Any] | None = None
last_revised_candidate = draft
for attempt in range(1, 4):
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="revise")
revise_attempts = attempt
strict_mode = attempt > 1
try:
revised_raw = revise_once(
goal=goal,
incident_context=incident_context,
draft=draft,
required_changes=critique["required_changes"],
strict_mode=strict_mode,
)
last_revised_candidate = revised_raw
revised_payload = gateway.validate_revision(
original=draft,
revised=revised_raw,
context=incident_context,
required_changes=critique["required_changes"],
)
break
except LLMTimeout:
return stopped("llm_timeout", phase="revise")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="revise")
except LLMEmpty:
return stopped("llm_empty", phase="revise")
except StopRun as exc:
if exc.reason == "patch_violation:required_changes_not_applied" and attempt < 3:
revise_retried = True
continue
if exc.reason == "patch_violation:required_changes_not_applied":
# Final fallback: enforce required phrase edits deterministically.
try:
fallback_revised = gateway.apply_required_changes_fallback(
text=last_revised_candidate,
required_changes=critique["required_changes"],
)
revised_payload = gateway.validate_revision(
original=draft,
revised=fallback_revised,
context=incident_context,
required_changes=critique["required_changes"],
)
revise_attempts = attempt + 1
revise_retried = True
break
except StopRun as fallback_exc:
return stopped(fallback_exc.reason, phase="revise")
return stopped(exc.reason, phase="revise")
if revised_payload is None:
return stopped("patch_violation:required_changes_not_applied", phase="revise")
final_answer = revised_payload["answer"]
revised = True
trace.append(
{
"step": 3,
"phase": "revise",
"patch_similarity": revised_payload["patch_similarity"],
"length_increase_pct": revised_payload["length_increase_pct"],
"required_changes_total": revised_payload["required_changes_total"],
"required_changes_enforced": revised_payload["required_changes_enforced"],
"required_changes_unenforced": revised_payload["required_changes_unenforced"],
"attempts_used": revise_attempts,
"retried": revise_retried,
"revised_hash": text_hash(final_answer),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "revise_once",
"required_changes": critique["required_changes"],
"revised_answer": final_answer,
}
)
try:
final_answer = gateway.validate_final(final_answer)
except StopRun as exc:
return stopped(exc.reason, phase="finalize")
audit_log = build_audit_log(
before=draft,
after=final_answer,
risks=critique["risks"],
required_changes=critique["required_changes"],
)
trace.append(
{
"step": 4 if revised else 3,
"phase": "audit_finalize",
"final_hash": text_hash(final_answer),
"changed": audit_log["changed"],
"diff_lines": len(audit_log["diff_excerpt"]),
"ok": True,
}
)
history.append(
{
"step": 4 if revised else 3,
"action": "audit_finalize",
"status": "final",
"changed": audit_log["changed"],
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once" if revised else "approved_direct",
"answer": final_answer,
"critique_decision": critique["decision"],
"severity": critique["severity"],
"risks": critique["risks"],
"required_changes": critique["required_changes"],
"audit": audit_log,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_self_critique_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (in plain words)
critique_decisiondrives the flow, but runtime controls execution permissions.- Critique and revision remain transparent through
trace + history + audit.
Example output
{
"run_id": "721e4231-7b9a-4843-99e0-888616025b35",
"status": "ok",
"stop_reason": "success",
"outcome": "revised_once",
"answer": "Current Status: We are experiencing a payment processing degradation affecting about 27% of US enterprise customer checkouts. The failed payment rate has risen to 3.4%, with 5 related chargeback alerts. Our engineering team is working to resolve the issue. We estimate recovery within approximately 45 minutes, though this timing may change as we learn more.\n\nCustomer Impact: Some customers may face difficulties completing payments, causing delayed order processing or the need to retry transactions. We are monitoring the situation closely to reduce disruption.\n\nNext Actions: We will publish updates on the status page every 15 minutes as we monitor payment failures. Our support team is preparing a workaround guide to assist affected customers. Please check the status page regularly and contact support if you need help.",
"critique_decision": "revise",
"severity": "medium",
"risks": [
{
"type": "overconfidence",
"note": "The phrase 'with an estimated recovery time of 45 minutes' may be interpreted as a guarantee rather than an estimate."
}
],
"required_changes": [
"MUST_REMOVE \"with an estimated recovery time of 45 minutes\"",
"MUST_INCLUDE \"We estimate recovery within approximately 45 minutes, though this timing may change as we learn more\"",
"MUST_INCLUDE \"We will publish updates on the status page every 15 minutes as we monitor payment failures\"",
"MUST_INCLUDE \"Our support team is preparing a workaround guide to assist affected customers\""
],
"audit": {
"changed": true,
"before_hash": "7cdaa40c3fa4",
"after_hash": "33b356380537",
"before_chars": 770,
"after_chars": 827,
"delta_chars": 57,
"length_increase_pct": 7.4,
"risks_count": 1,
"required_changes_count": 4,
"diff_excerpt": [
"-...",
"+..."
]
},
"trace": [
{
"step": 1,
"phase": "draft",
"draft_hash": "9b944d7375bf",
"chars": 770,
"attempts_used": 2,
"retried": true,
"ok": true
},
{
"step": 2,
"phase": "critique",
"decision": "revise",
"severity": "medium",
"risks": 1,
"required_changes": 4,
"ok": true
},
{
"step": 3,
"phase": "revise",
"patch_similarity": 0.849,
"length_increase_pct": 7.42,
"required_changes_total": 4,
"required_changes_enforced": 4,
"required_changes_unenforced": 0,
"attempts_used": 2,
"retried": true,
"revised_hash": "d92d90a2adbe",
"ok": true
},
{
"step": 4,
"phase": "audit_finalize",
"final_hash": "d92d90a2adbe",
"changed": true,
"diff_lines": 4,
"ok": true
}
],
"history": [{...}]
}
Typical stop_reason values
success— run completed correctlyllm_timeout— LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_empty— empty response from LLM in draft/revisellm_invalid_json— LLM returned invalid JSONllm_invalid_schema— JSON does not match contractinvalid_draft:*— draft failed basic validationinvalid_critique:*— critique failed policy-layer contractinvalid_critique:required_changes_not_enforceable— fordecision=revise, required_changes must be in enforceable format (ADD/REMOVE/MUST_*+ 1 quoted phrase)critique_decision_not_allowed_policy:*— critique decision is outside policy allowlistcritique_decision_denied_execution:*— runtime denied execution decisionpatch_violation:no_new_facts— revision introduced new factspatch_violation:length_increase_limit— revision exceeded length increase limitpatch_violation:restricted_claims— revision contains restricted claims; withavoid_absolute_guarantees=truethey are blocked strictlypatch_violation:required_changes_not_applied— revision did not apply required changespatch_violation:too_large_edit— revision exceeded patch-only boundariespolicy_escalation— critique returned escalation; details inescalation_reasonmax_seconds— total run time budget exceededinvalid_answer:*— final answer failed validation
What is NOT shown here
- persisted audit storage (DB / object storage)
- retry/backoff and circuit breaker for LLM
- human review queue for
policy_escalation - domain semantically-aware validation of
required_changes
What to try next
- Disable
AUTO_REVISION_ENABLEDand verifycritique_decision_denied_execution:revise. - Add a "severity budget" (for example, block
medium+for specific tenants). - Store
audit.diff_excerptin an external log sink (S3/ELK) for incident analysis.