Agent Fallback-Recovery en Python : Exemple complet

Exemple runnable style production d agent Fallback-Recovery en Python avec retry, fallback chain, checkpoint resume et stop_reason/trace transparents.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. TĂąche
  7. Code
  8. tools.py — tools primary/fallback
  9. context.py — construction du request envelope
  10. checkpoint_store.py — checkpoint aprĂšs progression sĂ»re
  11. gateway.py — recovery policy boundary
  12. llm.py — brief final court
  13. main.py — Detect -> Retry -> Fallback -> Resume -> Finalize
  14. Exemple de sortie
  15. Valeurs stop_reason typiques
  16. Ce qui n est PAS montré ici
  17. Ce que vous pouvez essayer ensuite

Essence du pattern (bref)

Fallback-Recovery Agent est un pattern oĂč l Ă©chec d une Ă©tape ne signifie pas un stop immĂ©diat.

L agent suit une boucle contrÎlée :

  1. détecte l erreur
  2. la classe
  3. tente un retry dans les limites de la policy
  4. bascule vers le fallback path
  5. continue depuis le checkpoint ou s arrĂȘte avec un stop_reason explicite

Ce que cet exemple démontre

  • retry pour timeout retriable (bornĂ© par le policy budget)
  • fallback chain: primary -> replica -> cache
  • runtime execution allowlist pour le contrĂŽle au niveau tool
  • checkpoint aprĂšs une Ă©tape rĂ©ussie
  • checkpoint TTL (optionnel) pour un resume sĂ»r
  • resume depuis checkpoint sans rĂ©exĂ©cuter l Ă©tape
  • trace/history/stop_reason explicites pour le monitoring production

Architecture

  1. main.py lance l étape payments_health via RecoveryGateway.
  2. Le tool primary timeout volontairement, le gateway fait retry.
  3. AprÚs épuisement du retry, le gateway passe au fallback (payments_replica_api).
  4. Le résultat réussi est stocké dans CheckpointStore.
  5. L étape suivante demand_signal s exécute et est aussi checkpointée.
  6. Le LLM compose le brief final court d operations uniquement à partir de faits agrégés.

Le LLM dĂ©cide uniquement du texte du brief final. La recovery policy (retry/fallback/budgets/allowlist) est entiĂšrement contrĂŽlĂ©e par l execution layer. Le checkpoint resume peut ĂȘtre contrĂŽlĂ© par TTL pour Ă©viter d utiliser des snapshots obsolĂštes.


Structure du projet

TEXT
agent-patterns/
└── fallback-recovery-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── checkpoint_store.py
        ├── context.py
        ├── llm.py
        ├── README.md
        └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaitĂ©, python-dotenv.


TĂąche

Cas production :

"Prépare une courte mise à jour sur un payment incident pour des clients enterprise US. Si l API primary est instable, ne tombe pas en erreur, récupÚre via fallback."


Code

tools.py — tools primary/fallback

PYTHON
from __future__ import annotations

import time
from typing import Any


_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.


def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
    return f"{request_id}:{report_date}:{region.upper()}:{suffix}"


def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    key = _request_key(report_date, region, request_id, "payments_primary")
    _PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
    attempt = _PRIMARY_ATTEMPTS[key]

    # Simulate primary instability: first two attempts timeout.
    if attempt <= 2:
        time.sleep(1.4)
        raise TimeoutError("primary_payments_timeout")

    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": "payments_primary_api",
        },
    }


def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del report_date, request_id
    time.sleep(0.2)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": f"payments_replica_api:{region.upper()}",
        },
    }


def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.031,
            "chargeback_alerts": 4,
            "incident_severity": "P1",
            "eta_minutes": 50,
            "stale_minutes": 18,
            "source": f"payments_cache:{report_date}:{region.upper()}",
        },
    }


def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.15)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.27,
            "avg_orders_per_minute": 182,
            "priority_segment": f"{region.upper()}_enterprise",
            "source": f"demand_primary_api:{report_date}",
        },
    }


def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.25,
            "avg_orders_per_minute": 176,
            "priority_segment": f"{region.upper()}_enterprise",
            "stale_minutes": 22,
            "source": f"demand_cache:{report_date}:{region.upper()}",
        },
    }

_PRIMARY_ATTEMPTS est utilisĂ© ici comme mock de dĂ©mo pour un comportement flaky primary contrĂŽlĂ©. En production, cet Ă©tat doit ĂȘtre isolĂ© par request (ou en request-scoped storage), pour Ă©viter les conflits entre runs parallĂšles.

context.py — construction du request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
        },
        "policy_hints": {
            "channel": "status_page",
            "audience": "enterprise_customers",
        },
    }

checkpoint_store.py — checkpoint aprĂšs progression sĂ»re

PYTHON
from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Any


@dataclass
class CheckpointRow:
    run_id: str
    step_id: str
    source: str
    tool: str
    result: dict[str, Any]
    saved_at: float
    ttl_seconds: float | None = None


class CheckpointStore:
    def __init__(self) -> None:
        self._rows: dict[tuple[str, str], CheckpointRow] = {}

    def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
        row = self._rows.get((run_id, step_id))
        if row is None:
            return None
        if row.ttl_seconds is None:
            return row
        now_ts = time.time() if now is None else float(now)
        if (now_ts - row.saved_at) > float(row.ttl_seconds):
            # Expired checkpoint: treat as missing.
            self._rows.pop((run_id, step_id), None)
            return None
        return row

    def save_step(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=None,
        )

    def save_step_with_ttl(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        ttl_seconds: float,
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=float(ttl_seconds),
        )

    def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        for row in self._rows.values():
            if row.run_id != run_id:
                continue
            out.append(
                {
                    "step_id": row.step_id,
                    "source": row.source,
                    "tool": row.tool,
                    "result_keys": sorted(row.result.keys()),
                    "saved_at": row.saved_at,
                }
            )
        out.sort(key=lambda item: item["saved_at"])
        return out

gateway.py — recovery policy boundary

PYTHON
from __future__ import annotations

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable

from checkpoint_store import CheckpointStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    step_timeout_seconds: float = 1.0
    max_retries: int = 1
    max_fallbacks: int = 2
    checkpoint_ttl_seconds: float = 900.0


def classify_exception(exc: Exception) -> str:
    if isinstance(exc, TimeoutError):
        msg = str(exc)[:80]
        return f"timeout:{msg}" if msg else "timeout"
    if isinstance(exc, ValueError):
        return "invalid_output"
    if isinstance(exc, RuntimeError):
        if "unavailable" in str(exc).lower():
            return "tool_unavailable"
        return "runtime_error"
    return "non_retriable"


def validate_tool_observation(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise ValueError("tool_observation_not_object")
    if raw.get("status") != "ok":
        raise ValueError("tool_observation_status_not_ok")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise ValueError("tool_observation_data_not_object")
    return data


class RecoveryGateway:
    def __init__(
        self,
        *,
        allowed_steps_policy: set[str],
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_steps_policy = set(allowed_steps_policy)
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def _ensure_run_budget(self, *, started_monotonic: float) -> None:
        elapsed = time.monotonic() - started_monotonic
        if elapsed > self.budget.max_seconds:
            raise StopRun("max_seconds")

    def _dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.step_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise TimeoutError(f"tool_timeout:{tool_name}") from exc
        return validate_tool_observation(raw)

    def _save_checkpoint(
        self,
        *,
        checkpoint: CheckpointStore,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        checkpoint_ttl_seconds: float | None = None,
    ) -> None:
        ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
        ttl = float(ttl_value)
        if ttl > 0:
            checkpoint.save_step_with_ttl(
                run_id=run_id,
                step_id=step_id,
                source=source,
                tool=tool,
                result=result,
                ttl_seconds=ttl,
            )
            return
        checkpoint.save_step(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
        )

    def run_step_with_recovery(
        self,
        *,
        run_id: str,
        step_id: str,
        primary_tool_name: str,
        primary_tool_fn: Callable[..., dict[str, Any]],
        fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
        args: dict[str, Any],
        checkpoint: CheckpointStore,
        started_monotonic: float,
        critical: bool = True,
        checkpoint_ttl_seconds: float | None = None,
    ) -> dict[str, Any]:
        self._ensure_run_budget(started_monotonic=started_monotonic)
        if step_id not in self.allowed_steps_policy:
            raise StopRun(f"step_not_allowed_policy:{step_id}")

        cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
        if cached is not None:
            return {
                "status": "done",
                "step_id": step_id,
                "source": "checkpoint",
                "tool": cached.tool,
                "result": cached.result,
                "attempts_used": 0,
                "primary_attempts": 0,
                "fallback_attempts": 0,
                "retried": False,
                "fallbacks_used": 0,
                "events": [{"kind": "checkpoint_resume", "step_id": step_id}],
            }

        events: list[dict[str, Any]] = []
        primary_attempts = 0
        fallback_attempts = 0
        last_reason = "unknown"

        if primary_tool_name not in self.allowed_tools_policy:
            events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
            last_reason = "tool_denied_policy"
        elif primary_tool_name not in self.allowed_tools_execution:
            events.append({"kind": "primary_denied", "tool": primary_tool_name})
            last_reason = "tool_denied"
        else:
            for retry_idx in range(self.budget.max_retries + 1):
                self._ensure_run_budget(started_monotonic=started_monotonic)
                primary_attempts += 1
                try:
                    data = self._dispatch(
                        tool_name=primary_tool_name,
                        tool_fn=primary_tool_fn,
                        args=args,
                    )
                    self._save_checkpoint(
                        checkpoint=checkpoint,
                        run_id=run_id,
                        step_id=step_id,
                        source="primary",
                        tool=primary_tool_name,
                        result=data,
                        checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                    )
                    return {
                        "status": "done",
                        "step_id": step_id,
                        "source": "primary",
                        "tool": primary_tool_name,
                        "result": data,
                        "attempts_used": primary_attempts,
                        "primary_attempts": primary_attempts,
                        "fallback_attempts": 0,
                        "retried": retry_idx > 0,
                        "fallbacks_used": 0,
                        "events": events,
                    }
                except Exception as exc:  # noqa: BLE001
                    is_timeout = isinstance(exc, TimeoutError)
                    reason = classify_exception(exc)
                    last_reason = reason
                    events.append(
                        {
                            "kind": "primary_failure",
                            "tool": primary_tool_name,
                            "attempt": retry_idx + 1,
                            "reason": reason,
                            "message": str(exc)[:120],
                        }
                    )
                    if is_timeout and retry_idx < self.budget.max_retries:
                        backoff = 0.25 * float(retry_idx + 1)
                        events.append({"kind": "retry_backoff", "seconds": backoff})
                        time.sleep(backoff)
                        continue
                    break

        fallbacks_used = 0
        for fallback_name, fallback_fn in fallback_chain:
            if fallbacks_used >= self.budget.max_fallbacks:
                break
            fallbacks_used += 1
            self._ensure_run_budget(started_monotonic=started_monotonic)
            if fallback_name not in self.allowed_tools_policy:
                events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
                continue
            if fallback_name not in self.allowed_tools_execution:
                events.append({"kind": "fallback_denied", "tool": fallback_name})
                continue
            try:
                fallback_attempts += 1
                data = self._dispatch(
                    tool_name=fallback_name,
                    tool_fn=fallback_fn,
                    args=args,
                )
                events.append({"kind": "fallback_success", "tool": fallback_name})
                self._save_checkpoint(
                    checkpoint=checkpoint,
                    run_id=run_id,
                    step_id=step_id,
                    source="fallback",
                    tool=fallback_name,
                    result=data,
                    checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                )
                return {
                    "status": "done",
                    "step_id": step_id,
                    "source": "fallback",
                    "tool": fallback_name,
                    "result": data,
                    "attempts_used": primary_attempts + fallback_attempts,
                    "primary_attempts": primary_attempts,
                    "fallback_attempts": fallback_attempts,
                    "retried": primary_attempts > 1,
                    "fallbacks_used": fallbacks_used,
                    "events": events,
                }
            except Exception as exc:  # noqa: BLE001
                reason = classify_exception(exc)
                last_reason = reason
                events.append(
                    {
                        "kind": "fallback_failure",
                        "tool": fallback_name,
                        "reason": reason,
                        "message": str(exc)[:120],
                    }
                )

        if critical:
            if last_reason in {"tool_denied_policy", "tool_denied"}:
                raise StopRun(f"{last_reason}:{step_id}")
            raise StopRun(f"recovery_exhausted:{step_id}")
        return {
            "status": "failed",
            "step_id": step_id,
            "source": "none",
            "tool": "none",
            "error": last_reason,
            "attempts_used": primary_attempts + fallback_attempts,
            "primary_attempts": primary_attempts,
            "fallback_attempts": fallback_attempts,
            "retried": primary_attempts > 1,
            "fallbacks_used": fallbacks_used,
            "events": events,
        }

llm.py — brief final court

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "answer": "short operations brief"
}

Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def compose_operations_brief(
    *,
    goal: str,
    aggregate: dict[str, Any],
    recovery_summary: dict[str, Any],
) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
        "recovery_summary": recovery_summary,
    }
    data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
    answer = data.get("answer")
    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    answer = answer.strip()
    if not answer:
        raise LLMEmpty("llm_empty")
    return answer

main.py — Detect -> Retry -> Fallback -> Resume -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
    demand_cached_snapshot,
    demand_primary_api,
    payments_cached_snapshot,
    payments_primary_api,
    payments_replica_api,
)

GOAL = (
    "Prepare a concise operations brief for the US payments incident. "
    "Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    step_timeout_seconds=1.0,
    max_retries=1,
    max_fallbacks=2,
    checkpoint_ttl_seconds=900.0,
)

ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
    "payments_primary_api",
    "payments_replica_api",
    "payments_cached_snapshot",
    "demand_primary_api",
    "demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY


def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    checkpoint = CheckpointStore()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RecoveryGateway(
        allowed_steps_policy=ALLOWED_STEPS_POLICY,
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        req = request["request"]
        common_args = {
            "report_date": req["report_date"],
            "region": req["region"],
            "request_id": run_id,
        }

        try:
            payments_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="payments_health",
                primary_tool_name="payments_primary_api",
                primary_tool_fn=payments_primary_api,
                fallback_chain=[
                    ("payments_replica_api", payments_replica_api),
                    ("payments_cached_snapshot", payments_cached_snapshot),
                ],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=300.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="payments_health")

        trace.append(
            {
                "step": 1,
                "phase": "payments_health",
                "source": payments_step["source"],
                "tool": payments_step["tool"],
                "attempts_used": payments_step["attempts_used"],
                "primary_attempts": payments_step["primary_attempts"],
                "fallback_attempts": payments_step["fallback_attempts"],
                "retried": payments_step["retried"],
                "fallbacks_used": payments_step["fallbacks_used"],
                "ok": payments_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 1,
                "action": "run_step_with_recovery",
                "step_id": "payments_health",
                "events": payments_step["events"],
                "result": payments_step.get("result"),
            }
        )

        try:
            demand_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="demand_signal",
                primary_tool_name="demand_primary_api",
                primary_tool_fn=demand_primary_api,
                fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=900.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="demand_signal")

        trace.append(
            {
                "step": 2,
                "phase": "demand_signal",
                "source": demand_step["source"],
                "tool": demand_step["tool"],
                "attempts_used": demand_step["attempts_used"],
                "primary_attempts": demand_step["primary_attempts"],
                "fallback_attempts": demand_step["fallback_attempts"],
                "retried": demand_step["retried"],
                "fallbacks_used": demand_step["fallbacks_used"],
                "ok": demand_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "run_step_with_recovery",
                "step_id": "demand_signal",
                "events": demand_step["events"],
                "result": demand_step.get("result"),
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "payments": payments_step["result"],
            "demand": demand_step["result"],
        }
        fallback_steps = [
            step["step_id"]
            for step in (payments_step, demand_step)
            if step["source"] == "fallback"
        ]
        checkpoint_saved_steps = sorted(
            set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
        )
        checkpoint_resumed_steps = []
        for h in history:
            for ev in (h.get("events") or []):
                if ev.get("kind") == "checkpoint_resume":
                    checkpoint_resumed_steps.append(ev.get("step_id"))

        recovery_summary = {
            "fallback_used": bool(fallback_steps),
            "fallback_steps": fallback_steps,
            "checkpoint_saved_steps": checkpoint_saved_steps,
            "checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
        }

        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="finalize")

        try:
            answer = compose_operations_brief(
                goal=goal,
                aggregate=aggregate,
                recovery_summary=recovery_summary,
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="finalize")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="finalize")
        except LLMEmpty:
            return stopped("llm_empty", phase="finalize")

        trace.append(
            {
                "step": 3,
                "phase": "finalize",
                "fallback_used": recovery_summary["fallback_used"],
                "ok": True,
            }
        )
        history.append({"step": 3, "action": "compose_operations_brief"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
            "answer": answer,
            "aggregate": aggregate,
            "recovery": recovery_summary,
            "checkpoint": checkpoint.dump_run(run_id=run_id),
            "trace": trace,
            "history": history,
        }
    finally:
        gateway.close()


def main() -> None:
    result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en clair)

  • Retry et fallback sont pilotĂ©s par des policy budgets, pas par le LLM.
  • _PRIMARY_ATTEMPTS dans tools.py est un mock demo-only ; en service rĂ©el, l Ă©tat doit ĂȘtre request-scoped.
  • AprĂšs une Ă©tape rĂ©ussie, un checkpoint est pris pour ne pas perdre de progression.
  • Les mĂ©triques primary_attempts/fallback_attempts montrent les vraies tentatives de recovery.
  • checkpoint_saved_steps et checkpoint_resumed_steps sĂ©parent sauvegarde et resume effectif.
  • Dans le code runnable, ThreadPoolExecutor est fermĂ© via gateway.close() (dans finally) pour Ă©viter les fuites de ressources.
  • trace/history rendent le comportement de recovery transparent.

Exemple de sortie

JSON
{
  "run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "recovered_with_fallback",
  "answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "payments": {
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "incident_severity": "P1",
      "eta_minutes": 45,
      "source": "payments_replica_api:US"
    },
    "demand": {
      "affected_checkout_share": 0.27,
      "avg_orders_per_minute": 182,
      "priority_segment": "US_enterprise",
      "source": "demand_primary_api:2026-03-06"
    }
  },
  "recovery": {
    "fallback_used": true,
    "fallback_steps": [
      "payments_health"
    ],
    "checkpoint_saved_steps": [
      "demand_signal",
      "payments_health"
    ],
    "checkpoint_resumed_steps": []
  },
  "checkpoint": [
    {
      "step_id": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "result_keys": [
        "chargeback_alerts",
        "eta_minutes",
        "failed_payment_rate",
        "incident_severity",
        "source"
      ],
      "saved_at": 1772475669.9678888
    },
    {
      "step_id": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "result_keys": [
        "affected_checkout_share",
        "avg_orders_per_minute",
        "priority_segment",
        "source"
      ],
      "saved_at": 1772475670.123298
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "attempts_used": 3,
      "primary_attempts": 2,
      "fallback_attempts": 1,
      "retried": true,
      "fallbacks_used": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "attempts_used": 1,
      "primary_attempts": 1,
      "fallback_attempts": 0,
      "retried": false,
      "fallbacks_used": 0,
      "ok": true
    },
    {
      "step": 3,
      "phase": "finalize",
      "fallback_used": true,
      "ok": true
    }
  ],
  "history": [{...}]
}

Valeurs stop_reason typiques

  • success — le run s est terminĂ© correctement
  • max_seconds — budget total de temps du run Ă©puisĂ©
  • step_not_allowed_policy:* — Ă©tape hors policy allowlist
  • tool_denied_policy:* — tool interdit par policy allowlist
  • tool_denied:* — tool interdit par runtime execution allowlist
  • recovery_exhausted:* — retry/fallback Ă©puisĂ© pour une Ă©tape critique
  • llm_timeout — le LLM n a pas rĂ©pondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty — brief final vide
  • llm_invalid_json — le LLM a retournĂ© un JSON invalide
  • llm_invalid_schema — le JSON ne respecte pas le contrat

Ce qui n est PAS montré ici

  • persisted checkpoint storage (Redis/DB)
  • adaptive retry strategy (jitter, circuit breaker)
  • tenant-specific execution allowlists
  • queue d escalade humaine pour les Ă©checs critiques de recovery

Ce que vous pouvez essayer ensuite

  1. Bloquer payments_replica_api dans ALLOWED_TOOLS_EXECUTION et vérifier le comportement degrade/stop.
  2. Réduire step_timeout_seconds et observer comment le trace recovery change.
  3. Ajouter une étape post-incident summary qui utilise le checkpoint sans recollecter les données.
⏱ 15 min de lecture ‱ Mis Ă  jour 6 mars 2026DifficultĂ©: ★★☆
Intégré : contrÎle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coĂ»t)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrĂȘt incident
  • Idempotence & dĂ©duplication
  • Audit logs & traçabilitĂ©
Mention intĂ©grĂ©e : OnceOnly est une couche de contrĂŽle pour des systĂšmes d’agents en prod.

Auteur

Nick — ingĂ©nieur qui construit une infrastructure pour des agents IA en production.

Focus : patterns d’agents, modes de dĂ©faillance, contrĂŽle du runtime et fiabilitĂ© des systĂšmes.

🔗 GitHub: https://github.com/mykolademyanov


Note éditoriale

Cette documentation est assistĂ©e par l’IA, avec une responsabilitĂ© Ă©ditoriale humaine pour l’exactitude, la clartĂ© et la pertinence en production.

Le contenu s’appuie sur des dĂ©faillances rĂ©elles, des post-mortems et des incidents opĂ©rationnels dans des systĂšmes d’agents IA dĂ©ployĂ©s.