Fallback-Recovery Agent на Python: повний приклад

Production-style runnable приклад Fallback-Recovery агента на Python з retry, fallback chain, checkpoint resume і прозорими stop_reason/trace.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Код
  8. tools.py — primary/fallback інструменти
  9. context.py — формування request envelope
  10. checkpoint_store.py — checkpoint після безпечного прогресу
  11. gateway.py — recovery policy boundary
  12. llm.py — фінальний короткий brief
  13. main.py — Detect -> Retry -> Fallback -> Resume -> Finalize
  14. Приклад виводу
  15. Типові stop_reason
  16. Що тут НЕ показано
  17. Що спробувати далі

Суть патерна (коротко)

Fallback-Recovery Agent — це патерн, у якому збій кроку не означає негайний stop.

Агент проходить керований контур:

  1. виявляє помилку
  2. класифікує її
  3. пробує retry в межах policy
  4. переключається на fallback path
  5. продовжує з checkpoint або зупиняється з явним stop_reason

Що демонструє цей приклад

  • retry для retriable timeout (обмежений policy budget)
  • fallback chain: primary -> replica -> cache
  • runtime execution allowlist для tool-level контролю
  • checkpoint після успішного кроку
  • checkpoint TTL (optional) для безпечного resume
  • resume з checkpoint без повторного виконання кроку
  • явні trace/history/stop_reason для продакшен-моніторингу

Архітектура

  1. main.py запускає крок payments_health через RecoveryGateway.
  2. Primary tool навмисно таймаутиться, gateway робить retry.
  3. Після вичерпання retry gateway переходить на fallback (payments_replica_api).
  4. Успішний результат зберігається в CheckpointStore.
  5. Наступний крок demand_signal виконується, також checkpoint-иться.
  6. LLM формує фінальний короткий operations brief тільки з агрегованих фактів.

LLM вирішує лише текст фінального brief. Recovery-policy (retry/fallback/budgets/allowlist) повністю контролює execution layer. Checkpoint resume може бути TTL-контрольованим, щоб не використовувати застарілі snapshot-и.


Структура проєкту

TEXT
agent-patterns/
└── fallback-recovery-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── checkpoint_store.py
        ├── context.py
        ├── llm.py
        ├── README.md
        └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.


Задача

Продакшен-кейс:

"Підготуй короткий update про платіжний інцидент для US enterprise клієнтів. Якщо primary API нестабільний — не падай, а відновись через fallback."


Код

tools.py — primary/fallback інструменти

PYTHON
from __future__ import annotations

import time
from typing import Any


_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.


def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
    return f"{request_id}:{report_date}:{region.upper()}:{suffix}"


def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    key = _request_key(report_date, region, request_id, "payments_primary")
    _PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
    attempt = _PRIMARY_ATTEMPTS[key]

    # Simulate primary instability: first two attempts timeout.
    if attempt <= 2:
        time.sleep(1.4)
        raise TimeoutError("primary_payments_timeout")

    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": "payments_primary_api",
        },
    }


def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del report_date, request_id
    time.sleep(0.2)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": f"payments_replica_api:{region.upper()}",
        },
    }


def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.031,
            "chargeback_alerts": 4,
            "incident_severity": "P1",
            "eta_minutes": 50,
            "stale_minutes": 18,
            "source": f"payments_cache:{report_date}:{region.upper()}",
        },
    }


def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.15)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.27,
            "avg_orders_per_minute": 182,
            "priority_segment": f"{region.upper()}_enterprise",
            "source": f"demand_primary_api:{report_date}",
        },
    }


def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.25,
            "avg_orders_per_minute": 176,
            "priority_segment": f"{region.upper()}_enterprise",
            "stale_minutes": 22,
            "source": f"demand_cache:{report_date}:{region.upper()}",
        },
    }

_PRIMARY_ATTEMPTS у цьому прикладі використовується як demo-мок для контрольованого flaky primary. У продакшені такий state має бути ізольований per-request (або в request-scoped storage), щоб паралельні запуски не конфліктували.

context.py — формування request envelope

PYTHON
from __future__ import annotations

from typing import Any


def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
        },
        "policy_hints": {
            "channel": "status_page",
            "audience": "enterprise_customers",
        },
    }

checkpoint_store.py — checkpoint після безпечного прогресу

PYTHON
from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Any


@dataclass
class CheckpointRow:
    run_id: str
    step_id: str
    source: str
    tool: str
    result: dict[str, Any]
    saved_at: float
    ttl_seconds: float | None = None


class CheckpointStore:
    def __init__(self) -> None:
        self._rows: dict[tuple[str, str], CheckpointRow] = {}

    def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
        row = self._rows.get((run_id, step_id))
        if row is None:
            return None
        if row.ttl_seconds is None:
            return row
        now_ts = time.time() if now is None else float(now)
        if (now_ts - row.saved_at) > float(row.ttl_seconds):
            # Expired checkpoint: treat as missing.
            self._rows.pop((run_id, step_id), None)
            return None
        return row

    def save_step(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=None,
        )

    def save_step_with_ttl(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        ttl_seconds: float,
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=float(ttl_seconds),
        )

    def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        for row in self._rows.values():
            if row.run_id != run_id:
                continue
            out.append(
                {
                    "step_id": row.step_id,
                    "source": row.source,
                    "tool": row.tool,
                    "result_keys": sorted(row.result.keys()),
                    "saved_at": row.saved_at,
                }
            )
        out.sort(key=lambda item: item["saved_at"])
        return out

gateway.py — recovery policy boundary

PYTHON
from __future__ import annotations

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable

from checkpoint_store import CheckpointStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    step_timeout_seconds: float = 1.0
    max_retries: int = 1
    max_fallbacks: int = 2
    checkpoint_ttl_seconds: float = 900.0


def classify_exception(exc: Exception) -> str:
    if isinstance(exc, TimeoutError):
        msg = str(exc)[:80]
        return f"timeout:{msg}" if msg else "timeout"
    if isinstance(exc, ValueError):
        return "invalid_output"
    if isinstance(exc, RuntimeError):
        if "unavailable" in str(exc).lower():
            return "tool_unavailable"
        return "runtime_error"
    return "non_retriable"


def validate_tool_observation(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise ValueError("tool_observation_not_object")
    if raw.get("status") != "ok":
        raise ValueError("tool_observation_status_not_ok")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise ValueError("tool_observation_data_not_object")
    return data


class RecoveryGateway:
    def __init__(
        self,
        *,
        allowed_steps_policy: set[str],
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_steps_policy = set(allowed_steps_policy)
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def _ensure_run_budget(self, *, started_monotonic: float) -> None:
        elapsed = time.monotonic() - started_monotonic
        if elapsed > self.budget.max_seconds:
            raise StopRun("max_seconds")

    def _dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.step_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise TimeoutError(f"tool_timeout:{tool_name}") from exc
        return validate_tool_observation(raw)

    def _save_checkpoint(
        self,
        *,
        checkpoint: CheckpointStore,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        checkpoint_ttl_seconds: float | None = None,
    ) -> None:
        ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
        ttl = float(ttl_value)
        if ttl > 0:
            checkpoint.save_step_with_ttl(
                run_id=run_id,
                step_id=step_id,
                source=source,
                tool=tool,
                result=result,
                ttl_seconds=ttl,
            )
            return
        checkpoint.save_step(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
        )

    def run_step_with_recovery(
        self,
        *,
        run_id: str,
        step_id: str,
        primary_tool_name: str,
        primary_tool_fn: Callable[..., dict[str, Any]],
        fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
        args: dict[str, Any],
        checkpoint: CheckpointStore,
        started_monotonic: float,
        critical: bool = True,
        checkpoint_ttl_seconds: float | None = None,
    ) -> dict[str, Any]:
        self._ensure_run_budget(started_monotonic=started_monotonic)
        if step_id not in self.allowed_steps_policy:
            raise StopRun(f"step_not_allowed_policy:{step_id}")

        cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
        if cached is not None:
            return {
                "status": "done",
                "step_id": step_id,
                "source": "checkpoint",
                "tool": cached.tool,
                "result": cached.result,
                "attempts_used": 0,
                "primary_attempts": 0,
                "fallback_attempts": 0,
                "retried": False,
                "fallbacks_used": 0,
                "events": [{"kind": "checkpoint_resume", "step_id": step_id}],
            }

        events: list[dict[str, Any]] = []
        primary_attempts = 0
        fallback_attempts = 0
        last_reason = "unknown"

        if primary_tool_name not in self.allowed_tools_policy:
            events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
            last_reason = "tool_denied_policy"
        elif primary_tool_name not in self.allowed_tools_execution:
            events.append({"kind": "primary_denied", "tool": primary_tool_name})
            last_reason = "tool_denied"
        else:
            for retry_idx in range(self.budget.max_retries + 1):
                self._ensure_run_budget(started_monotonic=started_monotonic)
                primary_attempts += 1
                try:
                    data = self._dispatch(
                        tool_name=primary_tool_name,
                        tool_fn=primary_tool_fn,
                        args=args,
                    )
                    self._save_checkpoint(
                        checkpoint=checkpoint,
                        run_id=run_id,
                        step_id=step_id,
                        source="primary",
                        tool=primary_tool_name,
                        result=data,
                        checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                    )
                    return {
                        "status": "done",
                        "step_id": step_id,
                        "source": "primary",
                        "tool": primary_tool_name,
                        "result": data,
                        "attempts_used": primary_attempts,
                        "primary_attempts": primary_attempts,
                        "fallback_attempts": 0,
                        "retried": retry_idx > 0,
                        "fallbacks_used": 0,
                        "events": events,
                    }
                except Exception as exc:  # noqa: BLE001
                    is_timeout = isinstance(exc, TimeoutError)
                    reason = classify_exception(exc)
                    last_reason = reason
                    events.append(
                        {
                            "kind": "primary_failure",
                            "tool": primary_tool_name,
                            "attempt": retry_idx + 1,
                            "reason": reason,
                            "message": str(exc)[:120],
                        }
                    )
                    if is_timeout and retry_idx < self.budget.max_retries:
                        backoff = 0.25 * float(retry_idx + 1)
                        events.append({"kind": "retry_backoff", "seconds": backoff})
                        time.sleep(backoff)
                        continue
                    break

        fallbacks_used = 0
        for fallback_name, fallback_fn in fallback_chain:
            if fallbacks_used >= self.budget.max_fallbacks:
                break
            fallbacks_used += 1
            self._ensure_run_budget(started_monotonic=started_monotonic)
            if fallback_name not in self.allowed_tools_policy:
                events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
                continue
            if fallback_name not in self.allowed_tools_execution:
                events.append({"kind": "fallback_denied", "tool": fallback_name})
                continue
            try:
                fallback_attempts += 1
                data = self._dispatch(
                    tool_name=fallback_name,
                    tool_fn=fallback_fn,
                    args=args,
                )
                events.append({"kind": "fallback_success", "tool": fallback_name})
                self._save_checkpoint(
                    checkpoint=checkpoint,
                    run_id=run_id,
                    step_id=step_id,
                    source="fallback",
                    tool=fallback_name,
                    result=data,
                    checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                )
                return {
                    "status": "done",
                    "step_id": step_id,
                    "source": "fallback",
                    "tool": fallback_name,
                    "result": data,
                    "attempts_used": primary_attempts + fallback_attempts,
                    "primary_attempts": primary_attempts,
                    "fallback_attempts": fallback_attempts,
                    "retried": primary_attempts > 1,
                    "fallbacks_used": fallbacks_used,
                    "events": events,
                }
            except Exception as exc:  # noqa: BLE001
                reason = classify_exception(exc)
                last_reason = reason
                events.append(
                    {
                        "kind": "fallback_failure",
                        "tool": fallback_name,
                        "reason": reason,
                        "message": str(exc)[:120],
                    }
                )

        if critical:
            if last_reason in {"tool_denied_policy", "tool_denied"}:
                raise StopRun(f"{last_reason}:{step_id}")
            raise StopRun(f"recovery_exhausted:{step_id}")
        return {
            "status": "failed",
            "step_id": step_id,
            "source": "none",
            "tool": "none",
            "error": last_reason,
            "attempts_used": primary_attempts + fallback_attempts,
            "primary_attempts": primary_attempts,
            "fallback_attempts": fallback_attempts,
            "retried": primary_attempts > 1,
            "fallbacks_used": fallbacks_used,
            "events": events,
        }

llm.py — фінальний короткий brief

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "answer": "short operations brief"
}

Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def compose_operations_brief(
    *,
    goal: str,
    aggregate: dict[str, Any],
    recovery_summary: dict[str, Any],
) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
        "recovery_summary": recovery_summary,
    }
    data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
    answer = data.get("answer")
    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    answer = answer.strip()
    if not answer:
        raise LLMEmpty("llm_empty")
    return answer

main.py — Detect -> Retry -> Fallback -> Resume -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
    demand_cached_snapshot,
    demand_primary_api,
    payments_cached_snapshot,
    payments_primary_api,
    payments_replica_api,
)

GOAL = (
    "Prepare a concise operations brief for the US payments incident. "
    "Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    step_timeout_seconds=1.0,
    max_retries=1,
    max_fallbacks=2,
    checkpoint_ttl_seconds=900.0,
)

ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
    "payments_primary_api",
    "payments_replica_api",
    "payments_cached_snapshot",
    "demand_primary_api",
    "demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY


def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    checkpoint = CheckpointStore()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RecoveryGateway(
        allowed_steps_policy=ALLOWED_STEPS_POLICY,
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        req = request["request"]
        common_args = {
            "report_date": req["report_date"],
            "region": req["region"],
            "request_id": run_id,
        }

        try:
            payments_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="payments_health",
                primary_tool_name="payments_primary_api",
                primary_tool_fn=payments_primary_api,
                fallback_chain=[
                    ("payments_replica_api", payments_replica_api),
                    ("payments_cached_snapshot", payments_cached_snapshot),
                ],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=300.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="payments_health")

        trace.append(
            {
                "step": 1,
                "phase": "payments_health",
                "source": payments_step["source"],
                "tool": payments_step["tool"],
                "attempts_used": payments_step["attempts_used"],
                "primary_attempts": payments_step["primary_attempts"],
                "fallback_attempts": payments_step["fallback_attempts"],
                "retried": payments_step["retried"],
                "fallbacks_used": payments_step["fallbacks_used"],
                "ok": payments_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 1,
                "action": "run_step_with_recovery",
                "step_id": "payments_health",
                "events": payments_step["events"],
                "result": payments_step.get("result"),
            }
        )

        try:
            demand_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="demand_signal",
                primary_tool_name="demand_primary_api",
                primary_tool_fn=demand_primary_api,
                fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=900.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="demand_signal")

        trace.append(
            {
                "step": 2,
                "phase": "demand_signal",
                "source": demand_step["source"],
                "tool": demand_step["tool"],
                "attempts_used": demand_step["attempts_used"],
                "primary_attempts": demand_step["primary_attempts"],
                "fallback_attempts": demand_step["fallback_attempts"],
                "retried": demand_step["retried"],
                "fallbacks_used": demand_step["fallbacks_used"],
                "ok": demand_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "run_step_with_recovery",
                "step_id": "demand_signal",
                "events": demand_step["events"],
                "result": demand_step.get("result"),
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "payments": payments_step["result"],
            "demand": demand_step["result"],
        }
        fallback_steps = [
            step["step_id"]
            for step in (payments_step, demand_step)
            if step["source"] == "fallback"
        ]
        checkpoint_saved_steps = sorted(
            set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
        )
        checkpoint_resumed_steps = []
        for h in history:
            for ev in (h.get("events") or []):
                if ev.get("kind") == "checkpoint_resume":
                    checkpoint_resumed_steps.append(ev.get("step_id"))

        recovery_summary = {
            "fallback_used": bool(fallback_steps),
            "fallback_steps": fallback_steps,
            "checkpoint_saved_steps": checkpoint_saved_steps,
            "checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
        }

        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="finalize")

        try:
            answer = compose_operations_brief(
                goal=goal,
                aggregate=aggregate,
                recovery_summary=recovery_summary,
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="finalize")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="finalize")
        except LLMEmpty:
            return stopped("llm_empty", phase="finalize")

        trace.append(
            {
                "step": 3,
                "phase": "finalize",
                "fallback_used": recovery_summary["fallback_used"],
                "ok": True,
            }
        )
        history.append({"step": 3, "action": "compose_operations_brief"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
            "answer": answer,
            "aggregate": aggregate,
            "recovery": recovery_summary,
            "checkpoint": checkpoint.dump_run(run_id=run_id),
            "trace": trace,
            "history": history,
        }
    finally:
        gateway.close()


def main() -> None:
    result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • Retry і fallback контролюються policy-бюджетами, а не LLM.
  • _PRIMARY_ATTEMPTS у tools.py — demo-only мок; у реальному сервісі state має бути request-scoped.
  • Після success-кроку є checkpoint, щоб не втрачати прогрес.
  • Метрики primary_attempts/fallback_attempts показують реальні спроби відновлення.
  • checkpoint_saved_steps і checkpoint_resumed_steps розділяють збереження та фактичний resume.
  • У runnable коді ThreadPoolExecutor закривається через gateway.close()finally) для уникнення ресурсних витоків.
  • trace/history роблять recovery-поведінку прозорою.

Приклад виводу

JSON
{
  "run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "recovered_with_fallback",
  "answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "payments": {
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "incident_severity": "P1",
      "eta_minutes": 45,
      "source": "payments_replica_api:US"
    },
    "demand": {
      "affected_checkout_share": 0.27,
      "avg_orders_per_minute": 182,
      "priority_segment": "US_enterprise",
      "source": "demand_primary_api:2026-03-06"
    }
  },
  "recovery": {
    "fallback_used": true,
    "fallback_steps": [
      "payments_health"
    ],
    "checkpoint_saved_steps": [
      "demand_signal",
      "payments_health"
    ],
    "checkpoint_resumed_steps": []
  },
  "checkpoint": [
    {
      "step_id": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "result_keys": [
        "chargeback_alerts",
        "eta_minutes",
        "failed_payment_rate",
        "incident_severity",
        "source"
      ],
      "saved_at": 1772475669.9678888
    },
    {
      "step_id": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "result_keys": [
        "affected_checkout_share",
        "avg_orders_per_minute",
        "priority_segment",
        "source"
      ],
      "saved_at": 1772475670.123298
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "attempts_used": 3,
      "primary_attempts": 2,
      "fallback_attempts": 1,
      "retried": true,
      "fallbacks_used": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "attempts_used": 1,
      "primary_attempts": 1,
      "fallback_attempts": 0,
      "retried": false,
      "fallbacks_used": 0,
      "ok": true
    },
    {
      "step": 3,
      "phase": "finalize",
      "fallback_used": true,
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • max_seconds — вичерпано загальний time budget run
  • step_not_allowed_policy:* — крок поза policy allowlist
  • tool_denied_policy:* — tool заборонений policy allowlist
  • tool_denied:* — tool заборонений runtime execution allowlist
  • recovery_exhausted:* — вичерпано retry/fallback для критичного кроку
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — фінальний brief порожній
  • llm_invalid_json — LLM повернув невалідний JSON
  • llm_invalid_schema — JSON не відповідає контракту

Що тут НЕ показано

  • persisted checkpoint storage (Redis/DB)
  • adaptive retry strategy (jitter, circuit breaker)
  • tenant-specific execution allowlists
  • human escalation queue для critical recovery failures

Що спробувати далі

  1. Заблокувати payments_replica_api у ALLOWED_TOOLS_EXECUTION і перевірити degrade/stop поведінку.
  2. Зменшити step_timeout_seconds і подивитися, як зміниться trace recovery.
  3. Додати крок post-incident summary, який використовує checkpoint без повторного збору даних.
⏱️ 14 хв читанняОновлено 6 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.