Pattern Essence (Brief)
Fallback-Recovery Agent is a pattern where a step failure does not mean immediate stop.
The agent follows a controlled loop:
- detects an error
- classifies it
- attempts retry within policy
- switches to fallback path
- continues from checkpoint or stops with explicit
stop_reason
What this example demonstrates
- retry for retriable timeout (bounded by policy budget)
- fallback chain:
primary -> replica -> cache - runtime execution allowlist for tool-level control
- checkpoint after a successful step
- checkpoint TTL (optional) for safe resume
- resume from checkpoint without re-executing the step
- explicit
trace/history/stop_reasonfor production monitoring
Architecture
main.pyruns steppayments_healththroughRecoveryGateway.- Primary tool intentionally times out, gateway performs retry.
- After retry is exhausted, gateway switches to fallback (
payments_replica_api). - Successful result is saved into
CheckpointStore. - Next step
demand_signalruns and is checkpointed too. - LLM composes the final short operations brief only from aggregated facts.
LLM decides only the text of the final brief. Recovery policy (retry/fallback/budgets/allowlist) is fully controlled by the execution layer. Checkpoint resume may be TTL-controlled to avoid using stale snapshots.
Project structure
agent-patterns/
βββ fallback-recovery-agent/
βββ python/
βββ main.py
βββ gateway.py
βββ tools.py
βββ checkpoint_store.py
βββ context.py
βββ llm.py
βββ README.md
βββ requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv.
Task
Production case:
"Prepare a short update about a payment incident for US enterprise customers. If the primary API is unstable, do not fail, recover via fallback."
Code
tools.py β primary/fallback tools
from __future__ import annotations
import time
from typing import Any
_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.
def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
return f"{request_id}:{report_date}:{region.upper()}:{suffix}"
def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
key = _request_key(report_date, region, request_id, "payments_primary")
_PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
attempt = _PRIMARY_ATTEMPTS[key]
# Simulate primary instability: first two attempts timeout.
if attempt <= 2:
time.sleep(1.4)
raise TimeoutError("primary_payments_timeout")
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_primary_api",
},
}
def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del report_date, request_id
time.sleep(0.2)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": f"payments_replica_api:{region.upper()}",
},
}
def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.05)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.031,
"chargeback_alerts": 4,
"incident_severity": "P1",
"eta_minutes": 50,
"stale_minutes": 18,
"source": f"payments_cache:{report_date}:{region.upper()}",
},
}
def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.15)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": f"{region.upper()}_enterprise",
"source": f"demand_primary_api:{report_date}",
},
}
def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.03)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.25,
"avg_orders_per_minute": 176,
"priority_segment": f"{region.upper()}_enterprise",
"stale_minutes": 22,
"source": f"demand_cache:{report_date}:{region.upper()}",
},
}
_PRIMARY_ATTEMPTS in this example is used as a demo mock for controlled flaky primary behavior.
In production, this state should be isolated per-request (or in request-scoped storage) so parallel runs do not conflict.
context.py β request envelope construction
from __future__ import annotations
from typing import Any
def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"request": {
"report_date": report_date,
"region": region.upper(),
},
"policy_hints": {
"channel": "status_page",
"audience": "enterprise_customers",
},
}
checkpoint_store.py β checkpoint after safe progress
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class CheckpointRow:
run_id: str
step_id: str
source: str
tool: str
result: dict[str, Any]
saved_at: float
ttl_seconds: float | None = None
class CheckpointStore:
def __init__(self) -> None:
self._rows: dict[tuple[str, str], CheckpointRow] = {}
def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
row = self._rows.get((run_id, step_id))
if row is None:
return None
if row.ttl_seconds is None:
return row
now_ts = time.time() if now is None else float(now)
if (now_ts - row.saved_at) > float(row.ttl_seconds):
# Expired checkpoint: treat as missing.
self._rows.pop((run_id, step_id), None)
return None
return row
def save_step(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=None,
)
def save_step_with_ttl(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
ttl_seconds: float,
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=float(ttl_seconds),
)
def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for row in self._rows.values():
if row.run_id != run_id:
continue
out.append(
{
"step_id": row.step_id,
"source": row.source,
"tool": row.tool,
"result_keys": sorted(row.result.keys()),
"saved_at": row.saved_at,
}
)
out.sort(key=lambda item: item["saved_at"])
return out
gateway.py β recovery policy boundary
from __future__ import annotations
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable
from checkpoint_store import CheckpointStore
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 120
step_timeout_seconds: float = 1.0
max_retries: int = 1
max_fallbacks: int = 2
checkpoint_ttl_seconds: float = 900.0
def classify_exception(exc: Exception) -> str:
if isinstance(exc, TimeoutError):
msg = str(exc)[:80]
return f"timeout:{msg}" if msg else "timeout"
if isinstance(exc, ValueError):
return "invalid_output"
if isinstance(exc, RuntimeError):
if "unavailable" in str(exc).lower():
return "tool_unavailable"
return "runtime_error"
return "non_retriable"
def validate_tool_observation(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict):
raise ValueError("tool_observation_not_object")
if raw.get("status") != "ok":
raise ValueError("tool_observation_status_not_ok")
data = raw.get("data")
if not isinstance(data, dict):
raise ValueError("tool_observation_data_not_object")
return data
class RecoveryGateway:
def __init__(
self,
*,
allowed_steps_policy: set[str],
allowed_tools_policy: set[str],
allowed_tools_execution: set[str],
budget: Budget,
):
self.allowed_steps_policy = set(allowed_steps_policy)
self.allowed_tools_policy = set(allowed_tools_policy)
self.allowed_tools_execution = set(allowed_tools_execution)
self.budget = budget
self._pool = ThreadPoolExecutor(max_workers=4)
def close(self) -> None:
self._pool.shutdown(wait=False, cancel_futures=True)
def _ensure_run_budget(self, *, started_monotonic: float) -> None:
elapsed = time.monotonic() - started_monotonic
if elapsed > self.budget.max_seconds:
raise StopRun("max_seconds")
def _dispatch(
self,
*,
tool_name: str,
tool_fn: Callable[..., dict[str, Any]],
args: dict[str, Any],
) -> dict[str, Any]:
future = self._pool.submit(tool_fn, **args)
try:
raw = future.result(timeout=self.budget.step_timeout_seconds)
except FuturesTimeoutError as exc:
raise TimeoutError(f"tool_timeout:{tool_name}") from exc
return validate_tool_observation(raw)
def _save_checkpoint(
self,
*,
checkpoint: CheckpointStore,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
checkpoint_ttl_seconds: float | None = None,
) -> None:
ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
ttl = float(ttl_value)
if ttl > 0:
checkpoint.save_step_with_ttl(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
ttl_seconds=ttl,
)
return
checkpoint.save_step(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
)
def run_step_with_recovery(
self,
*,
run_id: str,
step_id: str,
primary_tool_name: str,
primary_tool_fn: Callable[..., dict[str, Any]],
fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
args: dict[str, Any],
checkpoint: CheckpointStore,
started_monotonic: float,
critical: bool = True,
checkpoint_ttl_seconds: float | None = None,
) -> dict[str, Any]:
self._ensure_run_budget(started_monotonic=started_monotonic)
if step_id not in self.allowed_steps_policy:
raise StopRun(f"step_not_allowed_policy:{step_id}")
cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
if cached is not None:
return {
"status": "done",
"step_id": step_id,
"source": "checkpoint",
"tool": cached.tool,
"result": cached.result,
"attempts_used": 0,
"primary_attempts": 0,
"fallback_attempts": 0,
"retried": False,
"fallbacks_used": 0,
"events": [{"kind": "checkpoint_resume", "step_id": step_id}],
}
events: list[dict[str, Any]] = []
primary_attempts = 0
fallback_attempts = 0
last_reason = "unknown"
if primary_tool_name not in self.allowed_tools_policy:
events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
last_reason = "tool_denied_policy"
elif primary_tool_name not in self.allowed_tools_execution:
events.append({"kind": "primary_denied", "tool": primary_tool_name})
last_reason = "tool_denied"
else:
for retry_idx in range(self.budget.max_retries + 1):
self._ensure_run_budget(started_monotonic=started_monotonic)
primary_attempts += 1
try:
data = self._dispatch(
tool_name=primary_tool_name,
tool_fn=primary_tool_fn,
args=args,
)
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="primary",
tool=primary_tool_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "primary",
"tool": primary_tool_name,
"result": data,
"attempts_used": primary_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": 0,
"retried": retry_idx > 0,
"fallbacks_used": 0,
"events": events,
}
except Exception as exc: # noqa: BLE001
is_timeout = isinstance(exc, TimeoutError)
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "primary_failure",
"tool": primary_tool_name,
"attempt": retry_idx + 1,
"reason": reason,
"message": str(exc)[:120],
}
)
if is_timeout and retry_idx < self.budget.max_retries:
backoff = 0.25 * float(retry_idx + 1)
events.append({"kind": "retry_backoff", "seconds": backoff})
time.sleep(backoff)
continue
break
fallbacks_used = 0
for fallback_name, fallback_fn in fallback_chain:
if fallbacks_used >= self.budget.max_fallbacks:
break
fallbacks_used += 1
self._ensure_run_budget(started_monotonic=started_monotonic)
if fallback_name not in self.allowed_tools_policy:
events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
continue
if fallback_name not in self.allowed_tools_execution:
events.append({"kind": "fallback_denied", "tool": fallback_name})
continue
try:
fallback_attempts += 1
data = self._dispatch(
tool_name=fallback_name,
tool_fn=fallback_fn,
args=args,
)
events.append({"kind": "fallback_success", "tool": fallback_name})
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="fallback",
tool=fallback_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "fallback",
"tool": fallback_name,
"result": data,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
except Exception as exc: # noqa: BLE001
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "fallback_failure",
"tool": fallback_name,
"reason": reason,
"message": str(exc)[:120],
}
)
if critical:
if last_reason in {"tool_denied_policy", "tool_denied"}:
raise StopRun(f"{last_reason}:{step_id}")
raise StopRun(f"recovery_exhausted:{step_id}")
return {
"status": "failed",
"step_id": step_id,
"source": "none",
"tool": "none",
"error": last_reason,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
llm.py β final short brief
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
"answer": "short operations brief"
}
Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def compose_operations_brief(
*,
goal: str,
aggregate: dict[str, Any],
recovery_summary: dict[str, Any],
) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
"recovery_summary": recovery_summary,
}
data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
answer = data.get("answer")
if not isinstance(answer, str):
raise LLMInvalid("llm_invalid_schema")
answer = answer.strip()
if not answer:
raise LLMEmpty("llm_empty")
return answer
main.py β Detect -> Retry -> Fallback -> Resume -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
demand_cached_snapshot,
demand_primary_api,
payments_cached_snapshot,
payments_primary_api,
payments_replica_api,
)
GOAL = (
"Prepare a concise operations brief for the US payments incident. "
"Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")
BUDGET = Budget(
max_seconds=120,
step_timeout_seconds=1.0,
max_retries=1,
max_fallbacks=2,
checkpoint_ttl_seconds=900.0,
)
ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
"payments_primary_api",
"payments_replica_api",
"payments_cached_snapshot",
"demand_primary_api",
"demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY
def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
checkpoint = CheckpointStore()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = RecoveryGateway(
allowed_steps_policy=ALLOWED_STEPS_POLICY,
allowed_tools_policy=ALLOWED_TOOLS_POLICY,
allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
try:
req = request["request"]
common_args = {
"report_date": req["report_date"],
"region": req["region"],
"request_id": run_id,
}
try:
payments_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="payments_health",
primary_tool_name="payments_primary_api",
primary_tool_fn=payments_primary_api,
fallback_chain=[
("payments_replica_api", payments_replica_api),
("payments_cached_snapshot", payments_cached_snapshot),
],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=300.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="payments_health")
trace.append(
{
"step": 1,
"phase": "payments_health",
"source": payments_step["source"],
"tool": payments_step["tool"],
"attempts_used": payments_step["attempts_used"],
"primary_attempts": payments_step["primary_attempts"],
"fallback_attempts": payments_step["fallback_attempts"],
"retried": payments_step["retried"],
"fallbacks_used": payments_step["fallbacks_used"],
"ok": payments_step["status"] == "done",
}
)
history.append(
{
"step": 1,
"action": "run_step_with_recovery",
"step_id": "payments_health",
"events": payments_step["events"],
"result": payments_step.get("result"),
}
)
try:
demand_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="demand_signal",
primary_tool_name="demand_primary_api",
primary_tool_fn=demand_primary_api,
fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=900.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="demand_signal")
trace.append(
{
"step": 2,
"phase": "demand_signal",
"source": demand_step["source"],
"tool": demand_step["tool"],
"attempts_used": demand_step["attempts_used"],
"primary_attempts": demand_step["primary_attempts"],
"fallback_attempts": demand_step["fallback_attempts"],
"retried": demand_step["retried"],
"fallbacks_used": demand_step["fallbacks_used"],
"ok": demand_step["status"] == "done",
}
)
history.append(
{
"step": 2,
"action": "run_step_with_recovery",
"step_id": "demand_signal",
"events": demand_step["events"],
"result": demand_step.get("result"),
}
)
aggregate = {
"report_date": req["report_date"],
"region": req["region"],
"payments": payments_step["result"],
"demand": demand_step["result"],
}
fallback_steps = [
step["step_id"]
for step in (payments_step, demand_step)
if step["source"] == "fallback"
]
checkpoint_saved_steps = sorted(
set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
)
checkpoint_resumed_steps = []
for h in history:
for ev in (h.get("events") or []):
if ev.get("kind") == "checkpoint_resume":
checkpoint_resumed_steps.append(ev.get("step_id"))
recovery_summary = {
"fallback_used": bool(fallback_steps),
"fallback_steps": fallback_steps,
"checkpoint_saved_steps": checkpoint_saved_steps,
"checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
}
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="finalize")
try:
answer = compose_operations_brief(
goal=goal,
aggregate=aggregate,
recovery_summary=recovery_summary,
)
except LLMTimeout:
return stopped("llm_timeout", phase="finalize")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="finalize")
except LLMEmpty:
return stopped("llm_empty", phase="finalize")
trace.append(
{
"step": 3,
"phase": "finalize",
"fallback_used": recovery_summary["fallback_used"],
"ok": True,
}
)
history.append({"step": 3, "action": "compose_operations_brief"})
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
"answer": answer,
"aggregate": aggregate,
"recovery": recovery_summary,
"checkpoint": checkpoint.dump_run(run_id=run_id),
"trace": trace,
"history": history,
}
finally:
gateway.close()
def main() -> None:
result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (in plain words)
- Retry and fallback are controlled by policy budgets, not by LLM.
_PRIMARY_ATTEMPTSintools.pyis a demo-only mock; in a real service, state should be request-scoped.- After a successful step, a checkpoint is created to avoid losing progress.
- Metrics
primary_attempts/fallback_attemptsshow real recovery attempts. checkpoint_saved_stepsandcheckpoint_resumed_stepsseparate saving from actual resume.- In runnable code,
ThreadPoolExecutoris closed viagateway.close()(infinally) to avoid resource leaks. trace/historymake recovery behavior transparent.
Example output
{
"run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback",
"answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
"aggregate": {
"report_date": "2026-03-06",
"region": "US",
"payments": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_replica_api:US"
},
"demand": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": "US_enterprise",
"source": "demand_primary_api:2026-03-06"
}
},
"recovery": {
"fallback_used": true,
"fallback_steps": [
"payments_health"
],
"checkpoint_saved_steps": [
"demand_signal",
"payments_health"
],
"checkpoint_resumed_steps": []
},
"checkpoint": [
{
"step_id": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"result_keys": [
"chargeback_alerts",
"eta_minutes",
"failed_payment_rate",
"incident_severity",
"source"
],
"saved_at": 1772475669.9678888
},
{
"step_id": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"result_keys": [
"affected_checkout_share",
"avg_orders_per_minute",
"priority_segment",
"source"
],
"saved_at": 1772475670.123298
}
],
"trace": [
{
"step": 1,
"phase": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"attempts_used": 3,
"primary_attempts": 2,
"fallback_attempts": 1,
"retried": true,
"fallbacks_used": 1,
"ok": true
},
{
"step": 2,
"phase": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"attempts_used": 1,
"primary_attempts": 1,
"fallback_attempts": 0,
"retried": false,
"fallbacks_used": 0,
"ok": true
},
{
"step": 3,
"phase": "finalize",
"fallback_used": true,
"ok": true
}
],
"history": [{...}]
}
Typical stop_reason values
successβ run completed correctlymax_secondsβ overall run time budget exhaustedstep_not_allowed_policy:*β step is outside policy allowlisttool_denied_policy:*β tool is denied by policy allowlisttool_denied:*β tool is denied by runtime execution allowlistrecovery_exhausted:*β retry/fallback exhausted for a critical stepllm_timeoutβ LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_emptyβ final brief is emptyllm_invalid_jsonβ LLM returned invalid JSONllm_invalid_schemaβ JSON does not match contract
What is NOT shown here
- persisted checkpoint storage (Redis/DB)
- adaptive retry strategy (jitter, circuit breaker)
- tenant-specific execution allowlists
- human escalation queue for critical recovery failures
What to try next
- Block
payments_replica_apiinALLOWED_TOOLS_EXECUTIONand verify degrade/stop behavior. - Reduce
step_timeout_secondsand observe how recovery trace changes. - Add a post-incident summary step that uses checkpoint without re-collecting data.