Essence du pattern (bref)
Fallback-Recovery Agent est un pattern oĂč l Ă©chec d une Ă©tape ne signifie pas un stop immĂ©diat.
L agent suit une boucle contrÎlée :
- détecte l erreur
- la classe
- tente un retry dans les limites de la policy
- bascule vers le fallback path
- continue depuis le checkpoint ou s arrĂȘte avec un
stop_reasonexplicite
Ce que cet exemple démontre
- retry pour timeout retriable (borné par le policy budget)
- fallback chain:
primary -> replica -> cache - runtime execution allowlist pour le contrĂŽle au niveau tool
- checkpoint aprÚs une étape réussie
- checkpoint TTL (optionnel) pour un resume sûr
- resume depuis checkpoint sans réexécuter l étape
trace/history/stop_reasonexplicites pour le monitoring production
Architecture
main.pylance l étapepayments_healthviaRecoveryGateway.- Le tool primary timeout volontairement, le gateway fait retry.
- AprÚs épuisement du retry, le gateway passe au fallback (
payments_replica_api). - Le résultat réussi est stocké dans
CheckpointStore. - L étape suivante
demand_signals exécute et est aussi checkpointée. - Le LLM compose le brief final court d operations uniquement à partir de faits agrégés.
Le LLM dĂ©cide uniquement du texte du brief final. La recovery policy (retry/fallback/budgets/allowlist) est entiĂšrement contrĂŽlĂ©e par l execution layer. Le checkpoint resume peut ĂȘtre contrĂŽlĂ© par TTL pour Ă©viter d utiliser des snapshots obsolĂštes.
Structure du projet
agent-patterns/
âââ fallback-recovery-agent/
âââ python/
âââ main.py
âââ gateway.py
âââ tools.py
âââ checkpoint_store.py
âââ context.py
âââ llm.py
âââ README.md
âââ requirements.txt
Lancer le projet
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ est requis.
Option via export :
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optionnel)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Câest la variante shell (macOS/Linux). Sur Windows, il est plus simple dâutiliser des variables set ou, si souhaitĂ©, python-dotenv.
TĂąche
Cas production :
"Prépare une courte mise à jour sur un payment incident pour des clients enterprise US. Si l API primary est instable, ne tombe pas en erreur, récupÚre via fallback."
Code
tools.py â tools primary/fallback
from __future__ import annotations
import time
from typing import Any
_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.
def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
return f"{request_id}:{report_date}:{region.upper()}:{suffix}"
def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
key = _request_key(report_date, region, request_id, "payments_primary")
_PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
attempt = _PRIMARY_ATTEMPTS[key]
# Simulate primary instability: first two attempts timeout.
if attempt <= 2:
time.sleep(1.4)
raise TimeoutError("primary_payments_timeout")
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_primary_api",
},
}
def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del report_date, request_id
time.sleep(0.2)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": f"payments_replica_api:{region.upper()}",
},
}
def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.05)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.031,
"chargeback_alerts": 4,
"incident_severity": "P1",
"eta_minutes": 50,
"stale_minutes": 18,
"source": f"payments_cache:{report_date}:{region.upper()}",
},
}
def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.15)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": f"{region.upper()}_enterprise",
"source": f"demand_primary_api:{report_date}",
},
}
def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.03)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.25,
"avg_orders_per_minute": 176,
"priority_segment": f"{region.upper()}_enterprise",
"stale_minutes": 22,
"source": f"demand_cache:{report_date}:{region.upper()}",
},
}
_PRIMARY_ATTEMPTS est utilisé ici comme mock de démo pour un comportement flaky primary contrÎlé.
En production, cet Ă©tat doit ĂȘtre isolĂ© par request (ou en request-scoped storage), pour Ă©viter les conflits entre runs parallĂšles.
context.py â construction du request envelope
from __future__ import annotations
from typing import Any
def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"request": {
"report_date": report_date,
"region": region.upper(),
},
"policy_hints": {
"channel": "status_page",
"audience": "enterprise_customers",
},
}
checkpoint_store.py â checkpoint aprĂšs progression sĂ»re
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class CheckpointRow:
run_id: str
step_id: str
source: str
tool: str
result: dict[str, Any]
saved_at: float
ttl_seconds: float | None = None
class CheckpointStore:
def __init__(self) -> None:
self._rows: dict[tuple[str, str], CheckpointRow] = {}
def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
row = self._rows.get((run_id, step_id))
if row is None:
return None
if row.ttl_seconds is None:
return row
now_ts = time.time() if now is None else float(now)
if (now_ts - row.saved_at) > float(row.ttl_seconds):
# Expired checkpoint: treat as missing.
self._rows.pop((run_id, step_id), None)
return None
return row
def save_step(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=None,
)
def save_step_with_ttl(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
ttl_seconds: float,
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=float(ttl_seconds),
)
def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for row in self._rows.values():
if row.run_id != run_id:
continue
out.append(
{
"step_id": row.step_id,
"source": row.source,
"tool": row.tool,
"result_keys": sorted(row.result.keys()),
"saved_at": row.saved_at,
}
)
out.sort(key=lambda item: item["saved_at"])
return out
gateway.py â recovery policy boundary
from __future__ import annotations
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable
from checkpoint_store import CheckpointStore
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 120
step_timeout_seconds: float = 1.0
max_retries: int = 1
max_fallbacks: int = 2
checkpoint_ttl_seconds: float = 900.0
def classify_exception(exc: Exception) -> str:
if isinstance(exc, TimeoutError):
msg = str(exc)[:80]
return f"timeout:{msg}" if msg else "timeout"
if isinstance(exc, ValueError):
return "invalid_output"
if isinstance(exc, RuntimeError):
if "unavailable" in str(exc).lower():
return "tool_unavailable"
return "runtime_error"
return "non_retriable"
def validate_tool_observation(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict):
raise ValueError("tool_observation_not_object")
if raw.get("status") != "ok":
raise ValueError("tool_observation_status_not_ok")
data = raw.get("data")
if not isinstance(data, dict):
raise ValueError("tool_observation_data_not_object")
return data
class RecoveryGateway:
def __init__(
self,
*,
allowed_steps_policy: set[str],
allowed_tools_policy: set[str],
allowed_tools_execution: set[str],
budget: Budget,
):
self.allowed_steps_policy = set(allowed_steps_policy)
self.allowed_tools_policy = set(allowed_tools_policy)
self.allowed_tools_execution = set(allowed_tools_execution)
self.budget = budget
self._pool = ThreadPoolExecutor(max_workers=4)
def close(self) -> None:
self._pool.shutdown(wait=False, cancel_futures=True)
def _ensure_run_budget(self, *, started_monotonic: float) -> None:
elapsed = time.monotonic() - started_monotonic
if elapsed > self.budget.max_seconds:
raise StopRun("max_seconds")
def _dispatch(
self,
*,
tool_name: str,
tool_fn: Callable[..., dict[str, Any]],
args: dict[str, Any],
) -> dict[str, Any]:
future = self._pool.submit(tool_fn, **args)
try:
raw = future.result(timeout=self.budget.step_timeout_seconds)
except FuturesTimeoutError as exc:
raise TimeoutError(f"tool_timeout:{tool_name}") from exc
return validate_tool_observation(raw)
def _save_checkpoint(
self,
*,
checkpoint: CheckpointStore,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
checkpoint_ttl_seconds: float | None = None,
) -> None:
ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
ttl = float(ttl_value)
if ttl > 0:
checkpoint.save_step_with_ttl(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
ttl_seconds=ttl,
)
return
checkpoint.save_step(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
)
def run_step_with_recovery(
self,
*,
run_id: str,
step_id: str,
primary_tool_name: str,
primary_tool_fn: Callable[..., dict[str, Any]],
fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
args: dict[str, Any],
checkpoint: CheckpointStore,
started_monotonic: float,
critical: bool = True,
checkpoint_ttl_seconds: float | None = None,
) -> dict[str, Any]:
self._ensure_run_budget(started_monotonic=started_monotonic)
if step_id not in self.allowed_steps_policy:
raise StopRun(f"step_not_allowed_policy:{step_id}")
cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
if cached is not None:
return {
"status": "done",
"step_id": step_id,
"source": "checkpoint",
"tool": cached.tool,
"result": cached.result,
"attempts_used": 0,
"primary_attempts": 0,
"fallback_attempts": 0,
"retried": False,
"fallbacks_used": 0,
"events": [{"kind": "checkpoint_resume", "step_id": step_id}],
}
events: list[dict[str, Any]] = []
primary_attempts = 0
fallback_attempts = 0
last_reason = "unknown"
if primary_tool_name not in self.allowed_tools_policy:
events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
last_reason = "tool_denied_policy"
elif primary_tool_name not in self.allowed_tools_execution:
events.append({"kind": "primary_denied", "tool": primary_tool_name})
last_reason = "tool_denied"
else:
for retry_idx in range(self.budget.max_retries + 1):
self._ensure_run_budget(started_monotonic=started_monotonic)
primary_attempts += 1
try:
data = self._dispatch(
tool_name=primary_tool_name,
tool_fn=primary_tool_fn,
args=args,
)
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="primary",
tool=primary_tool_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "primary",
"tool": primary_tool_name,
"result": data,
"attempts_used": primary_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": 0,
"retried": retry_idx > 0,
"fallbacks_used": 0,
"events": events,
}
except Exception as exc: # noqa: BLE001
is_timeout = isinstance(exc, TimeoutError)
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "primary_failure",
"tool": primary_tool_name,
"attempt": retry_idx + 1,
"reason": reason,
"message": str(exc)[:120],
}
)
if is_timeout and retry_idx < self.budget.max_retries:
backoff = 0.25 * float(retry_idx + 1)
events.append({"kind": "retry_backoff", "seconds": backoff})
time.sleep(backoff)
continue
break
fallbacks_used = 0
for fallback_name, fallback_fn in fallback_chain:
if fallbacks_used >= self.budget.max_fallbacks:
break
fallbacks_used += 1
self._ensure_run_budget(started_monotonic=started_monotonic)
if fallback_name not in self.allowed_tools_policy:
events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
continue
if fallback_name not in self.allowed_tools_execution:
events.append({"kind": "fallback_denied", "tool": fallback_name})
continue
try:
fallback_attempts += 1
data = self._dispatch(
tool_name=fallback_name,
tool_fn=fallback_fn,
args=args,
)
events.append({"kind": "fallback_success", "tool": fallback_name})
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="fallback",
tool=fallback_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "fallback",
"tool": fallback_name,
"result": data,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
except Exception as exc: # noqa: BLE001
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "fallback_failure",
"tool": fallback_name,
"reason": reason,
"message": str(exc)[:120],
}
)
if critical:
if last_reason in {"tool_denied_policy", "tool_denied"}:
raise StopRun(f"{last_reason}:{step_id}")
raise StopRun(f"recovery_exhausted:{step_id}")
return {
"status": "failed",
"step_id": step_id,
"source": "none",
"tool": "none",
"error": last_reason,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
llm.py â brief final court
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
"answer": "short operations brief"
}
Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def compose_operations_brief(
*,
goal: str,
aggregate: dict[str, Any],
recovery_summary: dict[str, Any],
) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
"recovery_summary": recovery_summary,
}
data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
answer = data.get("answer")
if not isinstance(answer, str):
raise LLMInvalid("llm_invalid_schema")
answer = answer.strip()
if not answer:
raise LLMEmpty("llm_empty")
return answer
main.py â Detect -> Retry -> Fallback -> Resume -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
demand_cached_snapshot,
demand_primary_api,
payments_cached_snapshot,
payments_primary_api,
payments_replica_api,
)
GOAL = (
"Prepare a concise operations brief for the US payments incident. "
"Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")
BUDGET = Budget(
max_seconds=120,
step_timeout_seconds=1.0,
max_retries=1,
max_fallbacks=2,
checkpoint_ttl_seconds=900.0,
)
ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
"payments_primary_api",
"payments_replica_api",
"payments_cached_snapshot",
"demand_primary_api",
"demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY
def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
checkpoint = CheckpointStore()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = RecoveryGateway(
allowed_steps_policy=ALLOWED_STEPS_POLICY,
allowed_tools_policy=ALLOWED_TOOLS_POLICY,
allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
try:
req = request["request"]
common_args = {
"report_date": req["report_date"],
"region": req["region"],
"request_id": run_id,
}
try:
payments_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="payments_health",
primary_tool_name="payments_primary_api",
primary_tool_fn=payments_primary_api,
fallback_chain=[
("payments_replica_api", payments_replica_api),
("payments_cached_snapshot", payments_cached_snapshot),
],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=300.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="payments_health")
trace.append(
{
"step": 1,
"phase": "payments_health",
"source": payments_step["source"],
"tool": payments_step["tool"],
"attempts_used": payments_step["attempts_used"],
"primary_attempts": payments_step["primary_attempts"],
"fallback_attempts": payments_step["fallback_attempts"],
"retried": payments_step["retried"],
"fallbacks_used": payments_step["fallbacks_used"],
"ok": payments_step["status"] == "done",
}
)
history.append(
{
"step": 1,
"action": "run_step_with_recovery",
"step_id": "payments_health",
"events": payments_step["events"],
"result": payments_step.get("result"),
}
)
try:
demand_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="demand_signal",
primary_tool_name="demand_primary_api",
primary_tool_fn=demand_primary_api,
fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=900.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="demand_signal")
trace.append(
{
"step": 2,
"phase": "demand_signal",
"source": demand_step["source"],
"tool": demand_step["tool"],
"attempts_used": demand_step["attempts_used"],
"primary_attempts": demand_step["primary_attempts"],
"fallback_attempts": demand_step["fallback_attempts"],
"retried": demand_step["retried"],
"fallbacks_used": demand_step["fallbacks_used"],
"ok": demand_step["status"] == "done",
}
)
history.append(
{
"step": 2,
"action": "run_step_with_recovery",
"step_id": "demand_signal",
"events": demand_step["events"],
"result": demand_step.get("result"),
}
)
aggregate = {
"report_date": req["report_date"],
"region": req["region"],
"payments": payments_step["result"],
"demand": demand_step["result"],
}
fallback_steps = [
step["step_id"]
for step in (payments_step, demand_step)
if step["source"] == "fallback"
]
checkpoint_saved_steps = sorted(
set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
)
checkpoint_resumed_steps = []
for h in history:
for ev in (h.get("events") or []):
if ev.get("kind") == "checkpoint_resume":
checkpoint_resumed_steps.append(ev.get("step_id"))
recovery_summary = {
"fallback_used": bool(fallback_steps),
"fallback_steps": fallback_steps,
"checkpoint_saved_steps": checkpoint_saved_steps,
"checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
}
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="finalize")
try:
answer = compose_operations_brief(
goal=goal,
aggregate=aggregate,
recovery_summary=recovery_summary,
)
except LLMTimeout:
return stopped("llm_timeout", phase="finalize")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="finalize")
except LLMEmpty:
return stopped("llm_empty", phase="finalize")
trace.append(
{
"step": 3,
"phase": "finalize",
"fallback_used": recovery_summary["fallback_used"],
"ok": True,
}
)
history.append({"step": 3, "action": "compose_operations_brief"})
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
"answer": answer,
"aggregate": aggregate,
"recovery": recovery_summary,
"checkpoint": checkpoint.dump_run(run_id=run_id),
"trace": trace,
"history": history,
}
finally:
gateway.close()
def main() -> None:
result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Ce qui compte le plus ici (en clair)
- Retry et fallback sont pilotés par des policy budgets, pas par le LLM.
_PRIMARY_ATTEMPTSdanstools.pyest un mock demo-only ; en service rĂ©el, l Ă©tat doit ĂȘtre request-scoped.- AprĂšs une Ă©tape rĂ©ussie, un checkpoint est pris pour ne pas perdre de progression.
- Les métriques
primary_attempts/fallback_attemptsmontrent les vraies tentatives de recovery. checkpoint_saved_stepsetcheckpoint_resumed_stepsséparent sauvegarde et resume effectif.- Dans le code runnable,
ThreadPoolExecutorest fermé viagateway.close()(dansfinally) pour éviter les fuites de ressources. trace/historyrendent le comportement de recovery transparent.
Exemple de sortie
{
"run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback",
"answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
"aggregate": {
"report_date": "2026-03-06",
"region": "US",
"payments": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_replica_api:US"
},
"demand": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": "US_enterprise",
"source": "demand_primary_api:2026-03-06"
}
},
"recovery": {
"fallback_used": true,
"fallback_steps": [
"payments_health"
],
"checkpoint_saved_steps": [
"demand_signal",
"payments_health"
],
"checkpoint_resumed_steps": []
},
"checkpoint": [
{
"step_id": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"result_keys": [
"chargeback_alerts",
"eta_minutes",
"failed_payment_rate",
"incident_severity",
"source"
],
"saved_at": 1772475669.9678888
},
{
"step_id": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"result_keys": [
"affected_checkout_share",
"avg_orders_per_minute",
"priority_segment",
"source"
],
"saved_at": 1772475670.123298
}
],
"trace": [
{
"step": 1,
"phase": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"attempts_used": 3,
"primary_attempts": 2,
"fallback_attempts": 1,
"retried": true,
"fallbacks_used": 1,
"ok": true
},
{
"step": 2,
"phase": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"attempts_used": 1,
"primary_attempts": 1,
"fallback_attempts": 0,
"retried": false,
"fallbacks_used": 0,
"ok": true
},
{
"step": 3,
"phase": "finalize",
"fallback_used": true,
"ok": true
}
],
"history": [{...}]
}
Valeurs stop_reason typiques
successâ le run s est terminĂ© correctementmax_secondsâ budget total de temps du run Ă©puisĂ©step_not_allowed_policy:*â Ă©tape hors policy allowlisttool_denied_policy:*â tool interdit par policy allowlisttool_denied:*â tool interdit par runtime execution allowlistrecovery_exhausted:*â retry/fallback Ă©puisĂ© pour une Ă©tape critiquellm_timeoutâ le LLM n a pas rĂ©pondu dansOPENAI_TIMEOUT_SECONDSllm_emptyâ brief final videllm_invalid_jsonâ le LLM a retournĂ© un JSON invalidellm_invalid_schemaâ le JSON ne respecte pas le contrat
Ce qui n est PAS montré ici
- persisted checkpoint storage (Redis/DB)
- adaptive retry strategy (jitter, circuit breaker)
- tenant-specific execution allowlists
- queue d escalade humaine pour les échecs critiques de recovery
Ce que vous pouvez essayer ensuite
- Bloquer
payments_replica_apidansALLOWED_TOOLS_EXECUTIONet vérifier le comportement degrade/stop. - Réduire
step_timeout_secondset observer comment le trace recovery change. - Ajouter une étape post-incident summary qui utilise le checkpoint sans recollecter les données.