Fallback-Recovery Agent in Python: Vollständiges Beispiel

Production-style runnable Beispiel eines Fallback-Recovery Agents in Python mit Retry, Fallback-Chain, Checkpoint-Resume und transparenten stop_reason/trace.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausführen
  6. Aufgabe
  7. Code
  8. tools.py — Primary/Fallback-Tools
  9. context.py — Aufbau des Request-Envelopes
  10. checkpoint_store.py — Checkpoint nach sicherem Fortschritt
  11. gateway.py — recovery policy boundary
  12. llm.py — finaler kurzer Brief
  13. main.py — Detect -> Retry -> Fallback -> Resume -> Finalize
  14. Beispielausgabe
  15. Typische stop_reason-Werte
  16. Was hier NICHT gezeigt wird
  17. Was du als Nächstes ausprobieren kannst

Kern des Musters (Kurz)

Fallback-Recovery Agent ist ein Muster, bei dem ein Step-Fehler nicht sofortigen Stop bedeutet.

Der Agent durchläuft einen kontrollierten Ablauf:

  1. erkennt den Fehler
  2. klassifiziert ihn
  3. versucht Retry innerhalb der Policy
  4. wechselt auf den Fallback-Pfad
  5. fährt vom Checkpoint fort oder stoppt mit explizitem stop_reason

Was dieses Beispiel zeigt

  • Retry für retriable Timeout (durch Policy-Budget begrenzt)
  • fallback chain: primary -> replica -> cache
  • Runtime Execution-Allowlist für Tool-Level-Kontrolle
  • Checkpoint nach erfolgreichem Step
  • Checkpoint TTL (optional) für sicheres Resume
  • Resume vom Checkpoint ohne erneute Step-Ausführung
  • explizite trace/history/stop_reason für Production-Monitoring

Architektur

  1. main.py startet Step payments_health über RecoveryGateway.
  2. Das Primary-Tool timeoutet absichtlich, Gateway macht Retry.
  3. Nach ausgeschöpftem Retry wechselt Gateway auf Fallback (payments_replica_api).
  4. Das erfolgreiche Ergebnis wird in CheckpointStore gespeichert.
  5. Der nächste Step demand_signal wird ausgeführt und ebenfalls checkpointed.
  6. Das LLM erstellt den finalen kurzen Operations-Brief nur aus aggregierten Fakten.

Das LLM entscheidet nur den Text des finalen Briefs. Die Recovery-Policy (retry/fallback/budgets/allowlist) wird vollständig von der Execution-Layer kontrolliert. Checkpoint-Resume kann TTL-kontrolliert sein, um veraltete Snapshots nicht zu nutzen.


Projektstruktur

TEXT
agent-patterns/
└── fallback-recovery-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── checkpoint_store.py
        ├── context.py
        ├── llm.py
        ├── README.md
        └── requirements.txt

Ausführen

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante über export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante über .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv.


Aufgabe

Production-Fall:

"Erstelle ein kurzes Update zu einem Payment-Incident für US-Enterprise-Kunden. Wenn die Primary API instabil ist, nicht ausfallen, sondern über Fallback recovern."


Code

tools.py — Primary/Fallback-Tools

PYTHON
from __future__ import annotations

import time
from typing import Any


_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.


def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
    return f"{request_id}:{report_date}:{region.upper()}:{suffix}"


def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    key = _request_key(report_date, region, request_id, "payments_primary")
    _PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
    attempt = _PRIMARY_ATTEMPTS[key]

    # Simulate primary instability: first two attempts timeout.
    if attempt <= 2:
        time.sleep(1.4)
        raise TimeoutError("primary_payments_timeout")

    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": "payments_primary_api",
        },
    }


def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del report_date, request_id
    time.sleep(0.2)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.034,
            "chargeback_alerts": 5,
            "incident_severity": "P1",
            "eta_minutes": 45,
            "source": f"payments_replica_api:{region.upper()}",
        },
    }


def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.05)
    return {
        "status": "ok",
        "data": {
            "failed_payment_rate": 0.031,
            "chargeback_alerts": 4,
            "incident_severity": "P1",
            "eta_minutes": 50,
            "stale_minutes": 18,
            "source": f"payments_cache:{report_date}:{region.upper()}",
        },
    }


def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.15)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.27,
            "avg_orders_per_minute": 182,
            "priority_segment": f"{region.upper()}_enterprise",
            "source": f"demand_primary_api:{report_date}",
        },
    }


def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
    del request_id
    time.sleep(0.03)
    return {
        "status": "ok",
        "data": {
            "affected_checkout_share": 0.25,
            "avg_orders_per_minute": 176,
            "priority_segment": f"{region.upper()}_enterprise",
            "stale_minutes": 22,
            "source": f"demand_cache:{report_date}:{region.upper()}",
        },
    }

_PRIMARY_ATTEMPTS wird in diesem Beispiel als Demo-Mock für kontrolliertes flaky Primary-Verhalten verwendet. In Production sollte dieser State pro Request isoliert sein (oder in request-scoped storage), damit parallele Runs nicht kollidieren.

context.py — Aufbau des Request-Envelopes

PYTHON
from __future__ import annotations

from typing import Any


def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
        },
        "policy_hints": {
            "channel": "status_page",
            "audience": "enterprise_customers",
        },
    }

checkpoint_store.py — Checkpoint nach sicherem Fortschritt

PYTHON
from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Any


@dataclass
class CheckpointRow:
    run_id: str
    step_id: str
    source: str
    tool: str
    result: dict[str, Any]
    saved_at: float
    ttl_seconds: float | None = None


class CheckpointStore:
    def __init__(self) -> None:
        self._rows: dict[tuple[str, str], CheckpointRow] = {}

    def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
        row = self._rows.get((run_id, step_id))
        if row is None:
            return None
        if row.ttl_seconds is None:
            return row
        now_ts = time.time() if now is None else float(now)
        if (now_ts - row.saved_at) > float(row.ttl_seconds):
            # Expired checkpoint: treat as missing.
            self._rows.pop((run_id, step_id), None)
            return None
        return row

    def save_step(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=None,
        )

    def save_step_with_ttl(
        self,
        *,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        ttl_seconds: float,
    ) -> None:
        self._rows[(run_id, step_id)] = CheckpointRow(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
            saved_at=time.time(),
            ttl_seconds=float(ttl_seconds),
        )

    def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
        out: list[dict[str, Any]] = []
        for row in self._rows.values():
            if row.run_id != run_id:
                continue
            out.append(
                {
                    "step_id": row.step_id,
                    "source": row.source,
                    "tool": row.tool,
                    "result_keys": sorted(row.result.keys()),
                    "saved_at": row.saved_at,
                }
            )
        out.sort(key=lambda item: item["saved_at"])
        return out

gateway.py — recovery policy boundary

PYTHON
from __future__ import annotations

import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable

from checkpoint_store import CheckpointStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 120
    step_timeout_seconds: float = 1.0
    max_retries: int = 1
    max_fallbacks: int = 2
    checkpoint_ttl_seconds: float = 900.0


def classify_exception(exc: Exception) -> str:
    if isinstance(exc, TimeoutError):
        msg = str(exc)[:80]
        return f"timeout:{msg}" if msg else "timeout"
    if isinstance(exc, ValueError):
        return "invalid_output"
    if isinstance(exc, RuntimeError):
        if "unavailable" in str(exc).lower():
            return "tool_unavailable"
        return "runtime_error"
    return "non_retriable"


def validate_tool_observation(raw: Any) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise ValueError("tool_observation_not_object")
    if raw.get("status") != "ok":
        raise ValueError("tool_observation_status_not_ok")
    data = raw.get("data")
    if not isinstance(data, dict):
        raise ValueError("tool_observation_data_not_object")
    return data


class RecoveryGateway:
    def __init__(
        self,
        *,
        allowed_steps_policy: set[str],
        allowed_tools_policy: set[str],
        allowed_tools_execution: set[str],
        budget: Budget,
    ):
        self.allowed_steps_policy = set(allowed_steps_policy)
        self.allowed_tools_policy = set(allowed_tools_policy)
        self.allowed_tools_execution = set(allowed_tools_execution)
        self.budget = budget
        self._pool = ThreadPoolExecutor(max_workers=4)

    def close(self) -> None:
        self._pool.shutdown(wait=False, cancel_futures=True)

    def _ensure_run_budget(self, *, started_monotonic: float) -> None:
        elapsed = time.monotonic() - started_monotonic
        if elapsed > self.budget.max_seconds:
            raise StopRun("max_seconds")

    def _dispatch(
        self,
        *,
        tool_name: str,
        tool_fn: Callable[..., dict[str, Any]],
        args: dict[str, Any],
    ) -> dict[str, Any]:
        future = self._pool.submit(tool_fn, **args)
        try:
            raw = future.result(timeout=self.budget.step_timeout_seconds)
        except FuturesTimeoutError as exc:
            raise TimeoutError(f"tool_timeout:{tool_name}") from exc
        return validate_tool_observation(raw)

    def _save_checkpoint(
        self,
        *,
        checkpoint: CheckpointStore,
        run_id: str,
        step_id: str,
        source: str,
        tool: str,
        result: dict[str, Any],
        checkpoint_ttl_seconds: float | None = None,
    ) -> None:
        ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
        ttl = float(ttl_value)
        if ttl > 0:
            checkpoint.save_step_with_ttl(
                run_id=run_id,
                step_id=step_id,
                source=source,
                tool=tool,
                result=result,
                ttl_seconds=ttl,
            )
            return
        checkpoint.save_step(
            run_id=run_id,
            step_id=step_id,
            source=source,
            tool=tool,
            result=result,
        )

    def run_step_with_recovery(
        self,
        *,
        run_id: str,
        step_id: str,
        primary_tool_name: str,
        primary_tool_fn: Callable[..., dict[str, Any]],
        fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
        args: dict[str, Any],
        checkpoint: CheckpointStore,
        started_monotonic: float,
        critical: bool = True,
        checkpoint_ttl_seconds: float | None = None,
    ) -> dict[str, Any]:
        self._ensure_run_budget(started_monotonic=started_monotonic)
        if step_id not in self.allowed_steps_policy:
            raise StopRun(f"step_not_allowed_policy:{step_id}")

        cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
        if cached is not None:
            return {
                "status": "done",
                "step_id": step_id,
                "source": "checkpoint",
                "tool": cached.tool,
                "result": cached.result,
                "attempts_used": 0,
                "primary_attempts": 0,
                "fallback_attempts": 0,
                "retried": False,
                "fallbacks_used": 0,
                "events": [{"kind": "checkpoint_resume", "step_id": step_id}],
            }

        events: list[dict[str, Any]] = []
        primary_attempts = 0
        fallback_attempts = 0
        last_reason = "unknown"

        if primary_tool_name not in self.allowed_tools_policy:
            events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
            last_reason = "tool_denied_policy"
        elif primary_tool_name not in self.allowed_tools_execution:
            events.append({"kind": "primary_denied", "tool": primary_tool_name})
            last_reason = "tool_denied"
        else:
            for retry_idx in range(self.budget.max_retries + 1):
                self._ensure_run_budget(started_monotonic=started_monotonic)
                primary_attempts += 1
                try:
                    data = self._dispatch(
                        tool_name=primary_tool_name,
                        tool_fn=primary_tool_fn,
                        args=args,
                    )
                    self._save_checkpoint(
                        checkpoint=checkpoint,
                        run_id=run_id,
                        step_id=step_id,
                        source="primary",
                        tool=primary_tool_name,
                        result=data,
                        checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                    )
                    return {
                        "status": "done",
                        "step_id": step_id,
                        "source": "primary",
                        "tool": primary_tool_name,
                        "result": data,
                        "attempts_used": primary_attempts,
                        "primary_attempts": primary_attempts,
                        "fallback_attempts": 0,
                        "retried": retry_idx > 0,
                        "fallbacks_used": 0,
                        "events": events,
                    }
                except Exception as exc:  # noqa: BLE001
                    is_timeout = isinstance(exc, TimeoutError)
                    reason = classify_exception(exc)
                    last_reason = reason
                    events.append(
                        {
                            "kind": "primary_failure",
                            "tool": primary_tool_name,
                            "attempt": retry_idx + 1,
                            "reason": reason,
                            "message": str(exc)[:120],
                        }
                    )
                    if is_timeout and retry_idx < self.budget.max_retries:
                        backoff = 0.25 * float(retry_idx + 1)
                        events.append({"kind": "retry_backoff", "seconds": backoff})
                        time.sleep(backoff)
                        continue
                    break

        fallbacks_used = 0
        for fallback_name, fallback_fn in fallback_chain:
            if fallbacks_used >= self.budget.max_fallbacks:
                break
            fallbacks_used += 1
            self._ensure_run_budget(started_monotonic=started_monotonic)
            if fallback_name not in self.allowed_tools_policy:
                events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
                continue
            if fallback_name not in self.allowed_tools_execution:
                events.append({"kind": "fallback_denied", "tool": fallback_name})
                continue
            try:
                fallback_attempts += 1
                data = self._dispatch(
                    tool_name=fallback_name,
                    tool_fn=fallback_fn,
                    args=args,
                )
                events.append({"kind": "fallback_success", "tool": fallback_name})
                self._save_checkpoint(
                    checkpoint=checkpoint,
                    run_id=run_id,
                    step_id=step_id,
                    source="fallback",
                    tool=fallback_name,
                    result=data,
                    checkpoint_ttl_seconds=checkpoint_ttl_seconds,
                )
                return {
                    "status": "done",
                    "step_id": step_id,
                    "source": "fallback",
                    "tool": fallback_name,
                    "result": data,
                    "attempts_used": primary_attempts + fallback_attempts,
                    "primary_attempts": primary_attempts,
                    "fallback_attempts": fallback_attempts,
                    "retried": primary_attempts > 1,
                    "fallbacks_used": fallbacks_used,
                    "events": events,
                }
            except Exception as exc:  # noqa: BLE001
                reason = classify_exception(exc)
                last_reason = reason
                events.append(
                    {
                        "kind": "fallback_failure",
                        "tool": fallback_name,
                        "reason": reason,
                        "message": str(exc)[:120],
                    }
                )

        if critical:
            if last_reason in {"tool_denied_policy", "tool_denied"}:
                raise StopRun(f"{last_reason}:{step_id}")
            raise StopRun(f"recovery_exhausted:{step_id}")
        return {
            "status": "failed",
            "step_id": step_id,
            "source": "none",
            "tool": "none",
            "error": last_reason,
            "attempts_used": primary_attempts + fallback_attempts,
            "primary_attempts": primary_attempts,
            "fallback_attempts": fallback_attempts,
            "retried": primary_attempts > 1,
            "fallbacks_used": fallbacks_used,
            "events": events,
        }

llm.py — finaler kurzer Brief

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
  "answer": "short operations brief"
}

Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")
    return data


def compose_operations_brief(
    *,
    goal: str,
    aggregate: dict[str, Any],
    recovery_summary: dict[str, Any],
) -> str:
    payload = {
        "goal": goal,
        "aggregate": aggregate,
        "recovery_summary": recovery_summary,
    }
    data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
    answer = data.get("answer")
    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    answer = answer.strip()
    if not answer:
        raise LLMEmpty("llm_empty")
    return answer

main.py — Detect -> Retry -> Fallback -> Resume -> Finalize

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
    demand_cached_snapshot,
    demand_primary_api,
    payments_cached_snapshot,
    payments_primary_api,
    payments_replica_api,
)

GOAL = (
    "Prepare a concise operations brief for the US payments incident. "
    "Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")

BUDGET = Budget(
    max_seconds=120,
    step_timeout_seconds=1.0,
    max_retries=1,
    max_fallbacks=2,
    checkpoint_ttl_seconds=900.0,
)

ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
    "payments_primary_api",
    "payments_replica_api",
    "payments_cached_snapshot",
    "demand_primary_api",
    "demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY


def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    checkpoint = CheckpointStore()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RecoveryGateway(
        allowed_steps_policy=ALLOWED_STEPS_POLICY,
        allowed_tools_policy=ALLOWED_TOOLS_POLICY,
        allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
        budget=BUDGET,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    try:
        req = request["request"]
        common_args = {
            "report_date": req["report_date"],
            "region": req["region"],
            "request_id": run_id,
        }

        try:
            payments_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="payments_health",
                primary_tool_name="payments_primary_api",
                primary_tool_fn=payments_primary_api,
                fallback_chain=[
                    ("payments_replica_api", payments_replica_api),
                    ("payments_cached_snapshot", payments_cached_snapshot),
                ],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=300.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="payments_health")

        trace.append(
            {
                "step": 1,
                "phase": "payments_health",
                "source": payments_step["source"],
                "tool": payments_step["tool"],
                "attempts_used": payments_step["attempts_used"],
                "primary_attempts": payments_step["primary_attempts"],
                "fallback_attempts": payments_step["fallback_attempts"],
                "retried": payments_step["retried"],
                "fallbacks_used": payments_step["fallbacks_used"],
                "ok": payments_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 1,
                "action": "run_step_with_recovery",
                "step_id": "payments_health",
                "events": payments_step["events"],
                "result": payments_step.get("result"),
            }
        )

        try:
            demand_step = gateway.run_step_with_recovery(
                run_id=run_id,
                step_id="demand_signal",
                primary_tool_name="demand_primary_api",
                primary_tool_fn=demand_primary_api,
                fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
                args=common_args,
                checkpoint=checkpoint,
                started_monotonic=started,
                critical=True,
                checkpoint_ttl_seconds=900.0,
            )
        except StopRun as exc:
            return stopped(exc.reason, phase="demand_signal")

        trace.append(
            {
                "step": 2,
                "phase": "demand_signal",
                "source": demand_step["source"],
                "tool": demand_step["tool"],
                "attempts_used": demand_step["attempts_used"],
                "primary_attempts": demand_step["primary_attempts"],
                "fallback_attempts": demand_step["fallback_attempts"],
                "retried": demand_step["retried"],
                "fallbacks_used": demand_step["fallbacks_used"],
                "ok": demand_step["status"] == "done",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "run_step_with_recovery",
                "step_id": "demand_signal",
                "events": demand_step["events"],
                "result": demand_step.get("result"),
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "payments": payments_step["result"],
            "demand": demand_step["result"],
        }
        fallback_steps = [
            step["step_id"]
            for step in (payments_step, demand_step)
            if step["source"] == "fallback"
        ]
        checkpoint_saved_steps = sorted(
            set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
        )
        checkpoint_resumed_steps = []
        for h in history:
            for ev in (h.get("events") or []):
                if ev.get("kind") == "checkpoint_resume":
                    checkpoint_resumed_steps.append(ev.get("step_id"))

        recovery_summary = {
            "fallback_used": bool(fallback_steps),
            "fallback_steps": fallback_steps,
            "checkpoint_saved_steps": checkpoint_saved_steps,
            "checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
        }

        if (time.monotonic() - started) > BUDGET.max_seconds:
            return stopped("max_seconds", phase="finalize")

        try:
            answer = compose_operations_brief(
                goal=goal,
                aggregate=aggregate,
                recovery_summary=recovery_summary,
            )
        except LLMTimeout:
            return stopped("llm_timeout", phase="finalize")
        except LLMInvalid as exc:
            return stopped(exc.args[0], phase="finalize")
        except LLMEmpty:
            return stopped("llm_empty", phase="finalize")

        trace.append(
            {
                "step": 3,
                "phase": "finalize",
                "fallback_used": recovery_summary["fallback_used"],
                "ok": True,
            }
        )
        history.append({"step": 3, "action": "compose_operations_brief"})

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
            "answer": answer,
            "aggregate": aggregate,
            "recovery": recovery_summary,
            "checkpoint": checkpoint.dump_run(run_id=run_id),
            "trace": trace,
            "history": history,
        }
    finally:
        gateway.close()


def main() -> None:
    result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklärt)

  • Retry und Fallback werden durch Policy-Budgets gesteuert, nicht durch LLM.
  • _PRIMARY_ATTEMPTS in tools.py ist ein Demo-only Mock; im realen Service muss der State request-scoped sein.
  • Nach einem Success-Step gibt es einen Checkpoint, damit kein Fortschritt verloren geht.
  • Metriken primary_attempts/fallback_attempts zeigen echte Recovery-Versuche.
  • checkpoint_saved_steps und checkpoint_resumed_steps trennen Speichern und tatsächliches Resume.
  • Im runnable Code wird ThreadPoolExecutor via gateway.close() (in finally) geschlossen, um Ressourcenlecks zu vermeiden.
  • trace/history machen Recovery-Verhalten transparent.

Beispielausgabe

JSON
{
  "run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "recovered_with_fallback",
  "answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
  "aggregate": {
    "report_date": "2026-03-06",
    "region": "US",
    "payments": {
      "failed_payment_rate": 0.034,
      "chargeback_alerts": 5,
      "incident_severity": "P1",
      "eta_minutes": 45,
      "source": "payments_replica_api:US"
    },
    "demand": {
      "affected_checkout_share": 0.27,
      "avg_orders_per_minute": 182,
      "priority_segment": "US_enterprise",
      "source": "demand_primary_api:2026-03-06"
    }
  },
  "recovery": {
    "fallback_used": true,
    "fallback_steps": [
      "payments_health"
    ],
    "checkpoint_saved_steps": [
      "demand_signal",
      "payments_health"
    ],
    "checkpoint_resumed_steps": []
  },
  "checkpoint": [
    {
      "step_id": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "result_keys": [
        "chargeback_alerts",
        "eta_minutes",
        "failed_payment_rate",
        "incident_severity",
        "source"
      ],
      "saved_at": 1772475669.9678888
    },
    {
      "step_id": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "result_keys": [
        "affected_checkout_share",
        "avg_orders_per_minute",
        "priority_segment",
        "source"
      ],
      "saved_at": 1772475670.123298
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "payments_health",
      "source": "fallback",
      "tool": "payments_replica_api",
      "attempts_used": 3,
      "primary_attempts": 2,
      "fallback_attempts": 1,
      "retried": true,
      "fallbacks_used": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "demand_signal",
      "source": "primary",
      "tool": "demand_primary_api",
      "attempts_used": 1,
      "primary_attempts": 1,
      "fallback_attempts": 0,
      "retried": false,
      "fallbacks_used": 0,
      "ok": true
    },
    {
      "step": 3,
      "phase": "finalize",
      "fallback_used": true,
      "ok": true
    }
  ],
  "history": [{...}]
}

Typische stop_reason-Werte

  • success — Run wurde korrekt abgeschlossen
  • max_seconds — gesamtes Run-Zeitbudget ausgeschöpft
  • step_not_allowed_policy:* — Step liegt außerhalb der Policy-Allowlist
  • tool_denied_policy:* — Tool durch Policy-Allowlist verboten
  • tool_denied:* — Tool durch Runtime Execution-Allowlist verboten
  • recovery_exhausted:* — Retry/Fallback für kritischen Step ausgeschöpft
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_empty — finaler Brief ist leer
  • llm_invalid_json — LLM gab ungültiges JSON zurück
  • llm_invalid_schema — JSON entspricht nicht dem Vertrag

Was hier NICHT gezeigt wird

  • persisted checkpoint storage (Redis/DB)
  • adaptive retry strategy (jitter, circuit breaker)
  • tenant-specific execution allowlists
  • Human-Escalation-Queue für kritische Recovery-Fehler

Was du als Nächstes ausprobieren kannst

  1. payments_replica_api in ALLOWED_TOOLS_EXECUTION blockieren und degrade/stop Verhalten prüfen.
  2. step_timeout_seconds reduzieren und beobachten, wie sich der Recovery-Trace ändert.
  3. Einen post-incident summary Step hinzufügen, der Checkpoint nutzt ohne erneute Datenerhebung.
⏱️ 14 Min. LesezeitAktualisiert 6. März 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.

Autor

Nick — Engineer, der Infrastruktur für KI-Agenten in Produktion aufbaut.

Fokus: Agent-Patterns, Failure-Modes, Runtime-Steuerung und Systemzuverlässigkeit.

🔗 GitHub: https://github.com/mykolademyanov


Redaktioneller Hinweis

Diese Dokumentation ist KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Der Inhalt basiert auf realen Ausfällen, Post-Mortems und operativen Vorfällen in produktiv eingesetzten KI-Agenten-Systemen.