Reflection Agent β€” Python (full implementation with LLM)

Production-style runnable Reflection agent example in Python with one-pass review, patch-only revise, policy/execution boundary, and explicit stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. context.py β€” deterministic production context
  10. gateway.py β€” policy/execution boundary + patch guardrails
  11. llm.py β€” draft/review/revise LLM calls
  12. main.py β€” Reflection flow orchestration
  13. Example output
  14. Typical stop_reason values
  15. What is NOT shown
  16. What to try next

Pattern Essence (Brief)

Reflection Agent is a pattern where after an answer draft the agent performs one controlled self-review, then either approves the answer or makes one patch-only revision.

LLM decides how to formulate the draft and review, while the execution layer controls whether this can be executed within policy and runtime constraints.


What this example demonstrates

  • production-like flow: Draft -> Review -> Revise (optional) -> Finalize
  • one review pass and at most one revision (no infinite loop)
  • policy boundary for review decision (approve | revise | escalate)
  • execution boundary that enforces runtime decision allowlist
  • patch guardrails: no_new_facts, critical claims/token checks, edit-size control
  • controlled budgets (max_seconds, length limits, issue/fix limits)
  • explicit stop_reason, trace, history for audit

Architecture

  1. LLM generates an answer draft.
  2. LLM review returns a structured decision (approve/revise/escalate).
  3. Gateway validates the review contract by policy.
  4. Gateway enforces execution allowlist (runtime may differ from policy).
  5. If revise, one patch-only revision is executed.
  6. Gateway checks the patch (no_new_facts, critical tokens/claims, fix_plan hints, similarity) and finalizes the answer.

Key contract: LLM proposes changes, but final execution authority belongs to the execution layer.


Project structure

TEXT
examples/
└── agent-patterns/
    └── reflection-agent/
        └── python/
            β”œβ”€β”€ main.py          # Draft -> Review -> Revise -> Finalize flow
            β”œβ”€β”€ llm.py           # draft/review/revise LLM calls
            β”œβ”€β”€ gateway.py       # policy+execution validation, patch guards
            β”œβ”€β”€ context.py       # deterministic incident context
            β”œβ”€β”€ requirements.txt
            └── README.md

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/reflection-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.


Task

Imagine a production incident communication case:

"Prepare a customer-facing update about a P1 payment issue in the US, without risky promises and with clear next actions."

One draft often looks fine, but may contain:

  • overconfident wording
  • unclear accountability boundary
  • phrases that sound like a guarantee

Reflection adds one controlled review before sending.

Solution

In this example:

  • draft is generated from deterministic context
  • review returns structured JSON (not free text)
  • revise is allowed at most once
  • gateway blocks patch if new numbers/facts appear or edit is too large
  • escalate ends the run in a controlled way (status=stopped), without hidden auto-rewrites

Code

context.py β€” deterministic production context

PYTHON
from __future__ import annotations

from typing import Any


def build_incident_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "incident": {
            "incident_id": "inc_payments_20260305",
            "severity": "P1",
            "status": "degraded",
            "affected_checkout_pct": 27,
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "eta_minutes": 45,
        },
        "policy_hints": {
            "avoid_absolute_guarantees": True,
            "required_sections": ["current_status", "customer_impact", "next_actions"],
        },
        "approved_actions": [
            "monitor payment failures every 15 minutes",
            "publish customer update via status page",
            "prepare support macro with workaround guidance",
        ],
    }

What matters most here (in plain words)

  • Context is fixed and repeatable: this is convenient for tests and debugging.
  • LLM does not invent source data; it only formulates the answer over this context.

gateway.py β€” policy/execution boundary + patch guardrails

PYTHON
from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 30
    max_draft_chars: int = 900
    max_review_issues: int = 4
    max_fix_items: int = 4
    max_answer_chars: int = 900
    min_patch_similarity: float = 0.45


NUMBER_TOKEN_RE = re.compile(r"\b\d+(?:\.\d+)?%?\b")
INCIDENT_ID_RE = re.compile(r"\binc_[a-z0-9_]+\b", re.IGNORECASE)
SEVERITY_RE = re.compile(r"\bp[0-5]\b", re.IGNORECASE)
REGION_RE = re.compile(r"\b(us|eu|uk|ua|apac|global|emea|latam)\b", re.IGNORECASE)
QUOTED_PHRASE_RE = re.compile(r"['\"]([^'\"]{3,120})['\"]")

RESTRICTED_CLAIMS_RE = [
    re.compile(r"\bresolved\b", re.IGNORECASE),
    re.compile(r"\bfully[-\s]+recovered\b", re.IGNORECASE),
    re.compile(r"\bincident\s+closed\b", re.IGNORECASE),
    re.compile(r"\ball payments (?:are|is)\s+stable\b", re.IGNORECASE),
]


def _stable_json(value: Any) -> str:
    if value is None or isinstance(value, (bool, int, float, str)):
        return json.dumps(value, ensure_ascii=True, sort_keys=True)
    if isinstance(value, list):
        return "[" + ",".join(_stable_json(v) for v in value) + "]"
    if isinstance(value, dict):
        parts = []
        for key in sorted(value):
            parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
        return "{" + ",".join(parts) + "}"
    return json.dumps(str(value), ensure_ascii=True)


def _normalize_space(text: str) -> str:
    return " ".join((text or "").strip().split())


def text_hash(text: str) -> str:
    normalized = _normalize_space(text)
    raw = _stable_json(normalized)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]


def _extract_number_tokens(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(NUMBER_TOKEN_RE.findall(normalized))


def _extract_incident_ids(text: str) -> set[str]:
    normalized = _normalize_space(text).lower()
    return set(INCIDENT_ID_RE.findall(normalized))


def _extract_severity_labels(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {match.upper() for match in SEVERITY_RE.findall(normalized)}


def _extract_regions(text: str) -> set[str]:
    normalized = _normalize_space(text).upper()
    return {value.upper() for value in REGION_RE.findall(normalized)}


def _extract_fix_plan_phrase_rules(fix_plan: list[str]) -> dict[str, list[str]]:
    must_include: list[str] = []
    must_remove: list[str] = []

    def _append_unique(target: list[str], value: str) -> None:
        if value and value not in target:
            target.append(value)

    for item in fix_plan:
        item_norm = _normalize_space(item).lower()
        quoted = [_normalize_space(match).lower() for match in QUOTED_PHRASE_RE.findall(item)]
        quoted = [value for value in quoted if value]
        if not quoted:
            continue

        is_replace = "replace" in item_norm
        is_modify = any(word in item_norm for word in ("modify", "change", "update", "rewrite"))
        has_with = " with " in f" {item_norm} "
        has_example_marker = any(
            marker in item_norm for marker in ("such as", "for example", "e.g.", "e.g")
        )

        if is_replace or is_modify:
            _append_unique(must_remove, quoted[0])

            # Enforce phrase add only for strict replace-with instructions, not modify/example hints.
            if is_replace and len(quoted) >= 2 and has_with and not has_example_marker:
                _append_unique(must_include, quoted[1])
            continue

        for phrase in quoted:
            _append_unique(must_include, phrase)

    return {
        "must_include": must_include,
        "must_remove": must_remove,
    }


def _context_claim_text(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, (bool, int, float)):
        return str(value)
    if isinstance(value, list):
        return " ".join(_context_claim_text(item) for item in value)
    if isinstance(value, dict):
        parts: list[str] = []
        for key, item in value.items():
            parts.append(str(key))
            parts.append(_context_claim_text(item))
        return " ".join(parts)
    return str(value)


def _is_high_risk_issue(issue_type: str) -> bool:
    return issue_type in {"legal_risk", "policy_violation"}


def validate_draft(draft: Any, *, max_chars: int) -> str:
    if not isinstance(draft, str) or not draft.strip():
        raise StopRun("invalid_draft:empty")
    normalized = draft.strip()
    if len(normalized) > max_chars:
        raise StopRun("invalid_draft:too_long")
    return normalized


def validate_review(
    raw: Any,
    *,
    allowed_decisions_policy: set[str],
    allowed_issue_types_policy: set[str],
    max_review_issues: int,
    max_fix_items: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_review:not_object")

    decision = raw.get("decision")
    if not isinstance(decision, str) or not decision.strip():
        raise StopRun("invalid_review:decision")
    decision = decision.strip()
    if decision not in allowed_decisions_policy:
        raise StopRun(f"review_decision_not_allowed_policy:{decision}")

    issues_raw = raw.get("issues", [])
    if not isinstance(issues_raw, list):
        raise StopRun("invalid_review:issues")
    if len(issues_raw) > max_review_issues:
        raise StopRun("invalid_review:too_many_issues")

    issues: list[dict[str, str]] = []
    for item in issues_raw:
        if not isinstance(item, dict):
            raise StopRun("invalid_review:issue_item")

        issue_type = item.get("type")
        note = item.get("note")

        if not isinstance(issue_type, str) or not issue_type.strip():
            raise StopRun("invalid_review:issue_type")
        issue_type = issue_type.strip()
        if issue_type not in allowed_issue_types_policy:
            raise StopRun(f"review_issue_not_allowed_policy:{issue_type}")

        if not isinstance(note, str) or not note.strip():
            raise StopRun("invalid_review:issue_note")

        issues.append({"type": issue_type, "note": note.strip()})

    fix_plan_raw = raw.get("fix_plan", [])
    if not isinstance(fix_plan_raw, list):
        raise StopRun("invalid_review:fix_plan")
    if len(fix_plan_raw) > max_fix_items:
        raise StopRun("invalid_review:too_many_fix_items")

    fix_plan: list[str] = []
    for item in fix_plan_raw:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_review:fix_item")
        fix_plan.append(item.strip())

    reason = raw.get("reason", "")
    if reason is None:
        reason = ""
    if not isinstance(reason, str):
        raise StopRun("invalid_review:reason")
    reason = reason.strip()

    if decision == "approve":
        if issues and any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:approve_with_high_risk_issue")
        return {
            "decision": "approve",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": False,
        }

    if decision == "revise":
        if not issues:
            raise StopRun("invalid_review:revise_without_issues")
        if not fix_plan:
            raise StopRun("invalid_review:revise_without_fix_plan")
        if any(_is_high_risk_issue(issue["type"]) for issue in issues):
            raise StopRun("invalid_review:high_risk_requires_escalate")
        return {
            "decision": "revise",
            "issues": issues,
            "fix_plan": fix_plan,
            "reason": reason,
            "high_risk": False,
        }

    if decision == "escalate":
        if not reason:
            raise StopRun("invalid_review:escalate_reason_required")
        return {
            "decision": "escalate",
            "issues": issues,
            "fix_plan": [],
            "reason": reason,
            "high_risk": True,
        }

    raise StopRun("invalid_review:unknown_decision")


class ReflectionGateway:
    def __init__(
        self,
        *,
        allow_execution_decisions: set[str],
        budget: Budget,
    ):
        self.allow_execution_decisions = set(allow_execution_decisions)
        self.budget = budget

    def enforce_execution_decision(self, decision: str) -> None:
        if decision not in self.allow_execution_decisions:
            raise StopRun(f"review_decision_denied_execution:{decision}")

    def validate_revision(
        self,
        *,
        original: str,
        revised: str,
        context: dict[str, Any],
        fix_plan: list[str] | None = None,
    ) -> dict[str, Any]:
        if not isinstance(revised, str) or not revised.strip():
            raise StopRun("invalid_revised:empty")

        revised_clean = revised.strip()
        if len(revised_clean) > self.budget.max_answer_chars:
            raise StopRun("invalid_revised:too_long")

        normalized_original = _normalize_space(original)
        normalized_revised = _normalize_space(revised_clean)
        if normalized_original == normalized_revised:
            raise StopRun("invalid_revised:no_changes")

        similarity = SequenceMatcher(a=normalized_original, b=normalized_revised).ratio()
        if similarity < self.budget.min_patch_similarity:
            raise StopRun("patch_violation:too_large_edit")

        allowed_text_tokens = _stable_json(context) + " " + original
        allowed_text_claims = _normalize_space(_context_claim_text(context) + " " + original)
        revised_numbers = _extract_number_tokens(revised_clean)
        allowed_numbers = _extract_number_tokens(allowed_text_tokens)
        if revised_numbers - allowed_numbers:
            raise StopRun("patch_violation:no_new_facts")

        revised_ids = _extract_incident_ids(revised_clean)
        allowed_ids = _extract_incident_ids(allowed_text_tokens)
        if revised_ids - allowed_ids:
            raise StopRun("patch_violation:new_incident_id")

        revised_severity = _extract_severity_labels(revised_clean)
        allowed_severity = _extract_severity_labels(allowed_text_tokens)
        if revised_severity - allowed_severity:
            raise StopRun("patch_violation:new_severity_label")

        revised_regions = _extract_regions(revised_clean)
        allowed_regions = _extract_regions(allowed_text_tokens)
        if revised_regions - allowed_regions:
            raise StopRun("patch_violation:new_region")

        for claim_re in RESTRICTED_CLAIMS_RE:
            if claim_re.search(revised_clean) and not claim_re.search(allowed_text_claims):
                raise StopRun("patch_violation:restricted_claims")

        phrase_rules = _extract_fix_plan_phrase_rules(fix_plan or [])
        must_include = phrase_rules["must_include"]
        must_remove = phrase_rules["must_remove"]
        if must_include or must_remove:
            revised_lower = _normalize_space(revised_clean).lower()
            missing = [phrase for phrase in must_include if phrase not in revised_lower]
            if missing:
                raise StopRun("patch_violation:fix_plan_not_applied")
            still_present = [phrase for phrase in must_remove if phrase in revised_lower]
            if still_present:
                raise StopRun("patch_violation:fix_plan_not_applied")

        return {
            "answer": revised_clean,
            "patch_similarity": round(similarity, 3),
            "fix_plan_quoted_checks": len(must_include) + len(must_remove),
        }

    def validate_final(self, answer: str) -> str:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")

        cleaned = answer.strip()
        if len(cleaned) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")
        return cleaned

What matters most here (in plain words)

  • Gateway only enforces execution decisions passed from main.py.
  • Policy allowlist and execution allowlist are separated: runtime may be stricter.
  • After revision, gateway blocks new facts by numbers, critical tokens (incident_id/region/severity), and restricted claims.
  • For token checks, a stable JSON context is used; for claim checks, plain context text is used so regex checks are more reliable.
  • If fix_plan contains quoted phrases, gateway builds must_include/must_remove rules and checks them in the revision.
  • For replace/modify/change/update/rewrite instructions, the first quoted phrase is checked as must_remove.
  • For replace "A" with "B", phrase B is checked as must_include only when this is not an example (such as/for example/e.g.).
  • fix_plan_quoted_checks counts only these enforce rules, so it may be smaller than the number of fix_plan items.

llm.py β€” draft/review/revise LLM calls

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


DRAFT_SYSTEM_PROMPT = """
You are an operations communications writer.
Return exactly one JSON object:
{
  "draft": "short customer-safe incident update"
}

Rules:
- Use only facts from provided incident_context.
- Include current status, customer impact, and next actions.
- Avoid absolute guarantees and overconfident claims.
- Keep draft concise and actionable.
- Do not output markdown or extra keys.
""".strip()

REVIEW_SYSTEM_PROMPT = """
You are a reflection reviewer.
Return exactly one JSON object:
{
  "decision": "approve|revise|escalate",
  "issues": [{"type":"overconfidence","note":"..."}],
  "fix_plan": ["patch instruction"],
  "reason": "for escalate only"
}

Rules:
- Review exactly once.
- decision=approve: fix_plan must be empty.
- decision=revise: provide 1-4 concrete patch-only instructions.
- For enforceable instructions, include quoted target phrases in fix_plan.
- decision=escalate: use only for high-risk or policy-unsafe content.
- Do not add new facts in fix_plan.
- Use only issue types from allowed_issue_types.
- Do not output markdown or extra keys.
""".strip()

REVISE_SYSTEM_PROMPT = """
You are an editor applying one controlled patch.
Return exactly one JSON object:
{
  "revised_answer": "updated answer"
}

Rules:
- Edit only what is needed to satisfy fix_plan.
- Keep scope and intent of original draft.
- Do not introduce new facts or numbers.
- Keep answer concise and customer-safe.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def generate_draft(*, goal: str, incident_context: dict[str, Any]) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
    }
    data = _chat_json(system_prompt=DRAFT_SYSTEM_PROMPT, payload=payload)

    draft = data.get("draft")
    if not isinstance(draft, str):
        raise LLMInvalid("llm_invalid_schema")

    draft = draft.strip()
    if not draft:
        raise LLMEmpty("llm_empty")
    return draft


def review_draft(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    allowed_issue_types: list[str],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "allowed_issue_types": allowed_issue_types,
    }
    return _chat_json(system_prompt=REVIEW_SYSTEM_PROMPT, payload=payload)


def revise_once(
    *,
    goal: str,
    incident_context: dict[str, Any],
    draft: str,
    fix_plan: list[str],
) -> str:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "draft": draft,
        "fix_plan": fix_plan,
    }
    data = _chat_json(system_prompt=REVISE_SYSTEM_PROMPT, payload=payload)

    revised = data.get("revised_answer")
    if not isinstance(revised, str):
        raise LLMInvalid("llm_invalid_schema")

    revised = revised.strip()
    if not revised:
        raise LLMEmpty("llm_empty")
    return revised

What matters most here (in plain words)

  • Each stage returns a JSON contract, not free text.
  • llm_invalid_json and llm_invalid_schema are separated for clean diagnostics.

main.py β€” Reflection flow orchestration

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from context import build_incident_context
from gateway import Budget, ReflectionGateway, StopRun, text_hash, validate_draft, validate_review
from llm import LLMEmpty, LLMInvalid, LLMTimeout, generate_draft, review_draft, revise_once

GOAL = (
    "Draft a customer-facing payment incident update for US enterprise customers. "
    "Keep it accurate, avoid overconfident language, and include next actions."
)
INCIDENT_CONTEXT = build_incident_context(report_date="2026-03-05", region="US")

BUDGET = Budget(
    max_seconds=30,
    max_draft_chars=900,
    max_review_issues=4,
    max_fix_items=4,
    max_answer_chars=900,
    min_patch_similarity=0.45,
)

ALLOWED_REVIEW_DECISIONS_POLICY = {"approve", "revise", "escalate"}
AUTO_REVISION_ENABLED = True
ALLOWED_REVIEW_DECISIONS_EXECUTION = (
    ALLOWED_REVIEW_DECISIONS_POLICY if AUTO_REVISION_ENABLED else {"approve", "escalate"}
)

ALLOWED_ISSUE_TYPES_POLICY = {
    "overconfidence",
    "missing_uncertainty",
    "contradiction",
    "scope_leak",
    "policy_violation",
    "legal_risk",
}


def run_reflection_agent(*, goal: str, incident_context: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = ReflectionGateway(
        allow_execution_decisions=ALLOWED_REVIEW_DECISIONS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        draft_raw = generate_draft(goal=goal, incident_context=incident_context)
        draft = validate_draft(draft_raw, max_chars=BUDGET.max_draft_chars)
    except LLMTimeout:
        return stopped("llm_timeout", phase="draft")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="draft")
    except LLMEmpty:
        return stopped("llm_empty", phase="draft")
    except StopRun as exc:
        return stopped(exc.reason, phase="draft")

    trace.append(
        {
            "step": 1,
            "phase": "draft",
            "draft_hash": text_hash(draft),
            "chars": len(draft),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 1,
            "action": "draft_once",
            "draft": draft,
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="review")

    try:
        raw_review = review_draft(
            goal=goal,
            incident_context=incident_context,
            draft=draft,
            allowed_issue_types=sorted(ALLOWED_ISSUE_TYPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="review")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="review")

    try:
        review = validate_review(
            raw_review,
            allowed_decisions_policy=ALLOWED_REVIEW_DECISIONS_POLICY,
            allowed_issue_types_policy=ALLOWED_ISSUE_TYPES_POLICY,
            max_review_issues=BUDGET.max_review_issues,
            max_fix_items=BUDGET.max_fix_items,
        )
        gateway.enforce_execution_decision(review["decision"])
    except StopRun as exc:
        return stopped(exc.reason, phase="review", raw_review=raw_review)

    trace.append(
        {
            "step": 2,
            "phase": "review",
            "decision": review["decision"],
            "issues": len(review["issues"]),
            "fix_items": len(review["fix_plan"]),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 2,
            "action": "review_once",
            "review": review,
        }
    )

    if review["decision"] == "escalate":
        escalation_reason = str(review.get("reason", "")).strip()
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "policy_escalation",
            "escalation_reason": escalation_reason[:120],
            "phase": "review",
            "review": review,
            "trace": trace,
            "history": history,
        }

    final_answer = draft
    revised = False

    if review["decision"] == "revise":
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="revise")

        try:
            revised_raw = revise_once(
                goal=goal,
                incident_context=incident_context,
                draft=draft,
                fix_plan=review["fix_plan"],
            )
            revised_payload = gateway.validate_revision(
                original=draft,
                revised=revised_raw,
                context=incident_context,
                fix_plan=review["fix_plan"],
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="revise")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="revise")
        except LLMEmpty:
            return stopped("llm_empty", phase="revise")
        except StopRun as exc:
            return stopped(exc.reason, phase="revise")

        final_answer = revised_payload["answer"]
        revised = True

        trace.append(
            {
                "step": 3,
                "phase": "revise",
                "patch_similarity": revised_payload["patch_similarity"],
                "fix_plan_quoted_checks": revised_payload["fix_plan_quoted_checks"],
                "revised_hash": text_hash(final_answer),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "revise_once",
                "fix_plan": review["fix_plan"],
                "revised_answer": final_answer,
            }
        )

    try:
        final_answer = gateway.validate_final(final_answer)
    except StopRun as exc:
        return stopped(exc.reason, phase="finalize")

    trace.append(
        {
            "step": 4 if revised else 3,
            "phase": "finalize",
            "final_hash": text_hash(final_answer),
            "ok": True,
        }
    )
    history.append(
        {
            "step": 4 if revised else 3,
            "action": "finalize",
            "status": "final",
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "revised_once" if revised else "approved_direct",
        "answer": final_answer,
        "review_decision": review["decision"],
        "issues": review["issues"],
        "fix_plan": review["fix_plan"],
        "trace": trace,
        "history": history,
    }


def main() -> None:
    result = run_reflection_agent(goal=GOAL, incident_context=INCIDENT_CONTEXT)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (in plain words)

  • There is a clear separation: policy allowlist defines what is conceptually allowed, execution allowlist defines what runtime is actually allowed to do.
  • Run ends predictably: either success or a controlled stop_reason.
  • trace and history make every review/revise step understandable.

Example output

JSON
{
  "run_id": "f67eaad4-1c3e-4160-9d8b-1e2e9af42c82",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "revised_once",
  "answer": "We are currently experiencing payment service degradation affecting ~27% of US enterprise checkouts, with failed payments at 3.4% and 5 chargeback alerts. We expect recovery in approximately 45 minutes, but this may change. Next actions: monitor failures every 15 minutes, publish status page updates every 15 minutes, and support users with workaround macros.",
  "review_decision": "revise",
  "issues": [
    {
      "type": "overconfidence",
      "note": "ETA wording sounded too certain and could be read as a guarantee."
    }
  ],
  "fix_plan": [
    "Add uncertainty wording for ETA (for example: 'recovery in approximately 45 minutes, but this may change').",
    "State explicitly that status page updates will be published every 15 minutes.",
    "Keep a clear 'Next actions' section with monitoring, updates, and support macros."
  ],
  "trace": [
    {
      "step": 1,
      "phase": "draft",
      "draft_hash": "f4b3f386c80a",
      "chars": 547,
      "ok": true
    },
    {
      "step": 2,
      "phase": "review",
      "decision": "revise",
      "issues": 1,
      "fix_items": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "revise",
      "patch_similarity": 0.564,
      "fix_plan_quoted_checks": 1,
      "revised_hash": "2a5ac4952ae0",
      "ok": true
    },
    {
      "step": 4,
      "phase": "finalize",
      "final_hash": "2a5ac4952ae0",
      "ok": true
    }
  ],
  "history": [{...}]
}

Typical stop_reason values

  • success β€” run completed correctly
  • llm_timeout β€” LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_empty β€” empty LLM response in draft/revise
  • llm_invalid_json β€” LLM returned invalid JSON
  • llm_invalid_schema β€” JSON does not match the contract
  • invalid_draft:* β€” draft failed basic validation
  • invalid_review:* β€” review failed policy-layer contract
  • review_decision_not_allowed_policy:* β€” review decision is outside policy allowlist
  • review_decision_denied_execution:* β€” runtime denied execution decision
  • patch_violation:no_new_facts β€” revision introduced new numeric facts
  • patch_violation:new_incident_id|new_severity_label|new_region β€” revision introduced new critical identifiers
  • patch_violation:restricted_claims β€” revision introduced restricted claims (resolved, fully recovered, etc.) out of context
  • patch_violation:fix_plan_not_applied β€” revision did not apply quoted hints from fix_plan
  • patch_violation:too_large_edit β€” revision exceeded patch-only boundary
  • policy_escalation β€” review returned escalation as final decision; details in escalation_reason
  • max_seconds β€” total run time budget exceeded
  • invalid_answer:* β€” final answer failed validation

What is NOT shown

  • persisted storage for trace/history (example keeps everything in one run)
  • retry/backoff for LLM calls
  • human-in-the-loop queue for escalate
  • automatic patch diff render for UI

What to try next

  1. Disable AUTO_REVISION_ENABLED and observe review_decision_denied_execution:revise.
  2. Add strict allowed-changes checks by fix_plan (line-level patch contract).
  3. Emit policy_escalation to an external queue for manual review.
⏱️ 16 min read β€’ Updated Mar, 2026Difficulty: β˜…β˜…β˜†
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.
Author

This documentation is curated and maintained by engineers who ship AI agents in production.

The content is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Patterns and recommendations are grounded in post-mortems, failure modes, and operational incidents in deployed systems, including during the development and operation of governance infrastructure for agents at OnceOnly.