Суть патерна (коротко)
Fallback-Recovery Agent — це патерн, у якому збій кроку не означає негайний stop.
Агент проходить керований контур:
- виявляє помилку
- класифікує її
- пробує retry в межах policy
- переключається на fallback path
- продовжує з checkpoint або зупиняється з явним
stop_reason
Що демонструє цей приклад
- retry для retriable timeout (обмежений policy budget)
- fallback chain:
primary -> replica -> cache - runtime execution allowlist для tool-level контролю
- checkpoint після успішного кроку
- checkpoint TTL (optional) для безпечного resume
- resume з checkpoint без повторного виконання кроку
- явні
trace/history/stop_reasonдля продакшен-моніторингу
Архітектура
main.pyзапускає крокpayments_healthчерезRecoveryGateway.- Primary tool навмисно таймаутиться, gateway робить retry.
- Після вичерпання retry gateway переходить на fallback (
payments_replica_api). - Успішний результат зберігається в
CheckpointStore. - Наступний крок
demand_signalвиконується, також checkpoint-иться. - LLM формує фінальний короткий operations brief тільки з агрегованих фактів.
LLM вирішує лише текст фінального brief. Recovery-policy (retry/fallback/budgets/allowlist) повністю контролює execution layer. Checkpoint resume може бути TTL-контрольованим, щоб не використовувати застарілі snapshot-и.
Структура проєкту
agent-patterns/
└── fallback-recovery-agent/
└── python/
├── main.py
├── gateway.py
├── tools.py
├── checkpoint_store.py
├── context.py
├── llm.py
├── README.md
└── requirements.txt
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/fallback-recovery-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.
Задача
Продакшен-кейс:
"Підготуй короткий update про платіжний інцидент для US enterprise клієнтів. Якщо primary API нестабільний — не падай, а відновись через fallback."
Код
tools.py — primary/fallback інструменти
from __future__ import annotations
import time
from typing import Any
_PRIMARY_ATTEMPTS: dict[str, int] = {}
# Demo-only process-global state for deterministic primary failures.
# In production, keep attempt counters request-scoped to avoid cross-run collisions.
def _request_key(report_date: str, region: str, request_id: str, suffix: str) -> str:
return f"{request_id}:{report_date}:{region.upper()}:{suffix}"
def payments_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
key = _request_key(report_date, region, request_id, "payments_primary")
_PRIMARY_ATTEMPTS[key] = _PRIMARY_ATTEMPTS.get(key, 0) + 1
attempt = _PRIMARY_ATTEMPTS[key]
# Simulate primary instability: first two attempts timeout.
if attempt <= 2:
time.sleep(1.4)
raise TimeoutError("primary_payments_timeout")
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_primary_api",
},
}
def payments_replica_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del report_date, request_id
time.sleep(0.2)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": f"payments_replica_api:{region.upper()}",
},
}
def payments_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.05)
return {
"status": "ok",
"data": {
"failed_payment_rate": 0.031,
"chargeback_alerts": 4,
"incident_severity": "P1",
"eta_minutes": 50,
"stale_minutes": 18,
"source": f"payments_cache:{report_date}:{region.upper()}",
},
}
def demand_primary_api(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.15)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": f"{region.upper()}_enterprise",
"source": f"demand_primary_api:{report_date}",
},
}
def demand_cached_snapshot(report_date: str, region: str, request_id: str) -> dict[str, Any]:
del request_id
time.sleep(0.03)
return {
"status": "ok",
"data": {
"affected_checkout_share": 0.25,
"avg_orders_per_minute": 176,
"priority_segment": f"{region.upper()}_enterprise",
"stale_minutes": 22,
"source": f"demand_cache:{report_date}:{region.upper()}",
},
}
_PRIMARY_ATTEMPTS у цьому прикладі використовується як demo-мок для контрольованого flaky primary.
У продакшені такий state має бути ізольований per-request (або в request-scoped storage), щоб паралельні запуски не конфліктували.
context.py — формування request envelope
from __future__ import annotations
from typing import Any
def build_operations_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"request": {
"report_date": report_date,
"region": region.upper(),
},
"policy_hints": {
"channel": "status_page",
"audience": "enterprise_customers",
},
}
checkpoint_store.py — checkpoint після безпечного прогресу
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class CheckpointRow:
run_id: str
step_id: str
source: str
tool: str
result: dict[str, Any]
saved_at: float
ttl_seconds: float | None = None
class CheckpointStore:
def __init__(self) -> None:
self._rows: dict[tuple[str, str], CheckpointRow] = {}
def get_step(self, *, run_id: str, step_id: str, now: float | None = None) -> CheckpointRow | None:
row = self._rows.get((run_id, step_id))
if row is None:
return None
if row.ttl_seconds is None:
return row
now_ts = time.time() if now is None else float(now)
if (now_ts - row.saved_at) > float(row.ttl_seconds):
# Expired checkpoint: treat as missing.
self._rows.pop((run_id, step_id), None)
return None
return row
def save_step(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=None,
)
def save_step_with_ttl(
self,
*,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
ttl_seconds: float,
) -> None:
self._rows[(run_id, step_id)] = CheckpointRow(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
saved_at=time.time(),
ttl_seconds=float(ttl_seconds),
)
def dump_run(self, *, run_id: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for row in self._rows.values():
if row.run_id != run_id:
continue
out.append(
{
"step_id": row.step_id,
"source": row.source,
"tool": row.tool,
"result_keys": sorted(row.result.keys()),
"saved_at": row.saved_at,
}
)
out.sort(key=lambda item: item["saved_at"])
return out
gateway.py — recovery policy boundary
from __future__ import annotations
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import Any, Callable
from checkpoint_store import CheckpointStore
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_seconds: int = 120
step_timeout_seconds: float = 1.0
max_retries: int = 1
max_fallbacks: int = 2
checkpoint_ttl_seconds: float = 900.0
def classify_exception(exc: Exception) -> str:
if isinstance(exc, TimeoutError):
msg = str(exc)[:80]
return f"timeout:{msg}" if msg else "timeout"
if isinstance(exc, ValueError):
return "invalid_output"
if isinstance(exc, RuntimeError):
if "unavailable" in str(exc).lower():
return "tool_unavailable"
return "runtime_error"
return "non_retriable"
def validate_tool_observation(raw: Any) -> dict[str, Any]:
if not isinstance(raw, dict):
raise ValueError("tool_observation_not_object")
if raw.get("status") != "ok":
raise ValueError("tool_observation_status_not_ok")
data = raw.get("data")
if not isinstance(data, dict):
raise ValueError("tool_observation_data_not_object")
return data
class RecoveryGateway:
def __init__(
self,
*,
allowed_steps_policy: set[str],
allowed_tools_policy: set[str],
allowed_tools_execution: set[str],
budget: Budget,
):
self.allowed_steps_policy = set(allowed_steps_policy)
self.allowed_tools_policy = set(allowed_tools_policy)
self.allowed_tools_execution = set(allowed_tools_execution)
self.budget = budget
self._pool = ThreadPoolExecutor(max_workers=4)
def close(self) -> None:
self._pool.shutdown(wait=False, cancel_futures=True)
def _ensure_run_budget(self, *, started_monotonic: float) -> None:
elapsed = time.monotonic() - started_monotonic
if elapsed > self.budget.max_seconds:
raise StopRun("max_seconds")
def _dispatch(
self,
*,
tool_name: str,
tool_fn: Callable[..., dict[str, Any]],
args: dict[str, Any],
) -> dict[str, Any]:
future = self._pool.submit(tool_fn, **args)
try:
raw = future.result(timeout=self.budget.step_timeout_seconds)
except FuturesTimeoutError as exc:
raise TimeoutError(f"tool_timeout:{tool_name}") from exc
return validate_tool_observation(raw)
def _save_checkpoint(
self,
*,
checkpoint: CheckpointStore,
run_id: str,
step_id: str,
source: str,
tool: str,
result: dict[str, Any],
checkpoint_ttl_seconds: float | None = None,
) -> None:
ttl_value = self.budget.checkpoint_ttl_seconds if checkpoint_ttl_seconds is None else checkpoint_ttl_seconds
ttl = float(ttl_value)
if ttl > 0:
checkpoint.save_step_with_ttl(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
ttl_seconds=ttl,
)
return
checkpoint.save_step(
run_id=run_id,
step_id=step_id,
source=source,
tool=tool,
result=result,
)
def run_step_with_recovery(
self,
*,
run_id: str,
step_id: str,
primary_tool_name: str,
primary_tool_fn: Callable[..., dict[str, Any]],
fallback_chain: list[tuple[str, Callable[..., dict[str, Any]]]],
args: dict[str, Any],
checkpoint: CheckpointStore,
started_monotonic: float,
critical: bool = True,
checkpoint_ttl_seconds: float | None = None,
) -> dict[str, Any]:
self._ensure_run_budget(started_monotonic=started_monotonic)
if step_id not in self.allowed_steps_policy:
raise StopRun(f"step_not_allowed_policy:{step_id}")
cached = checkpoint.get_step(run_id=run_id, step_id=step_id)
if cached is not None:
return {
"status": "done",
"step_id": step_id,
"source": "checkpoint",
"tool": cached.tool,
"result": cached.result,
"attempts_used": 0,
"primary_attempts": 0,
"fallback_attempts": 0,
"retried": False,
"fallbacks_used": 0,
"events": [{"kind": "checkpoint_resume", "step_id": step_id}],
}
events: list[dict[str, Any]] = []
primary_attempts = 0
fallback_attempts = 0
last_reason = "unknown"
if primary_tool_name not in self.allowed_tools_policy:
events.append({"kind": "primary_denied_policy", "tool": primary_tool_name})
last_reason = "tool_denied_policy"
elif primary_tool_name not in self.allowed_tools_execution:
events.append({"kind": "primary_denied", "tool": primary_tool_name})
last_reason = "tool_denied"
else:
for retry_idx in range(self.budget.max_retries + 1):
self._ensure_run_budget(started_monotonic=started_monotonic)
primary_attempts += 1
try:
data = self._dispatch(
tool_name=primary_tool_name,
tool_fn=primary_tool_fn,
args=args,
)
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="primary",
tool=primary_tool_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "primary",
"tool": primary_tool_name,
"result": data,
"attempts_used": primary_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": 0,
"retried": retry_idx > 0,
"fallbacks_used": 0,
"events": events,
}
except Exception as exc: # noqa: BLE001
is_timeout = isinstance(exc, TimeoutError)
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "primary_failure",
"tool": primary_tool_name,
"attempt": retry_idx + 1,
"reason": reason,
"message": str(exc)[:120],
}
)
if is_timeout and retry_idx < self.budget.max_retries:
backoff = 0.25 * float(retry_idx + 1)
events.append({"kind": "retry_backoff", "seconds": backoff})
time.sleep(backoff)
continue
break
fallbacks_used = 0
for fallback_name, fallback_fn in fallback_chain:
if fallbacks_used >= self.budget.max_fallbacks:
break
fallbacks_used += 1
self._ensure_run_budget(started_monotonic=started_monotonic)
if fallback_name not in self.allowed_tools_policy:
events.append({"kind": "fallback_denied_policy", "tool": fallback_name})
continue
if fallback_name not in self.allowed_tools_execution:
events.append({"kind": "fallback_denied", "tool": fallback_name})
continue
try:
fallback_attempts += 1
data = self._dispatch(
tool_name=fallback_name,
tool_fn=fallback_fn,
args=args,
)
events.append({"kind": "fallback_success", "tool": fallback_name})
self._save_checkpoint(
checkpoint=checkpoint,
run_id=run_id,
step_id=step_id,
source="fallback",
tool=fallback_name,
result=data,
checkpoint_ttl_seconds=checkpoint_ttl_seconds,
)
return {
"status": "done",
"step_id": step_id,
"source": "fallback",
"tool": fallback_name,
"result": data,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
except Exception as exc: # noqa: BLE001
reason = classify_exception(exc)
last_reason = reason
events.append(
{
"kind": "fallback_failure",
"tool": fallback_name,
"reason": reason,
"message": str(exc)[:120],
}
)
if critical:
if last_reason in {"tool_denied_policy", "tool_denied"}:
raise StopRun(f"{last_reason}:{step_id}")
raise StopRun(f"recovery_exhausted:{step_id}")
return {
"status": "failed",
"step_id": step_id,
"source": "none",
"tool": "none",
"error": last_reason,
"attempts_used": primary_attempts + fallback_attempts,
"primary_attempts": primary_attempts,
"fallback_attempts": fallback_attempts,
"retried": primary_attempts > 1,
"fallbacks_used": fallbacks_used,
"events": events,
}
llm.py — фінальний короткий brief
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
FINAL_SYSTEM_PROMPT = """
You are an operations editor.
Return exactly one JSON object:
{
"answer": "short operations brief"
}
Rules:
- Use only provided aggregate facts.
- Mention if fallback recovery was used.
- Keep it concise and actionable.
- If ETA is present, phrase it as an estimate that may change. Never guarantee timelines.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _chat_json(*, system_prompt: str, payload: dict[str, Any]) -> dict[str, Any]:
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
return data
def compose_operations_brief(
*,
goal: str,
aggregate: dict[str, Any],
recovery_summary: dict[str, Any],
) -> str:
payload = {
"goal": goal,
"aggregate": aggregate,
"recovery_summary": recovery_summary,
}
data = _chat_json(system_prompt=FINAL_SYSTEM_PROMPT, payload=payload)
answer = data.get("answer")
if not isinstance(answer, str):
raise LLMInvalid("llm_invalid_schema")
answer = answer.strip()
if not answer:
raise LLMEmpty("llm_empty")
return answer
main.py — Detect -> Retry -> Fallback -> Resume -> Finalize
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from checkpoint_store import CheckpointStore
from context import build_operations_context
from gateway import Budget, RecoveryGateway, StopRun
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_operations_brief
from tools import (
demand_cached_snapshot,
demand_primary_api,
payments_cached_snapshot,
payments_primary_api,
payments_replica_api,
)
GOAL = (
"Prepare a concise operations brief for the US payments incident. "
"Recover from tool failures safely and keep the update customer-safe."
)
REQUEST = build_operations_context(report_date="2026-03-06", region="US")
BUDGET = Budget(
max_seconds=120,
step_timeout_seconds=1.0,
max_retries=1,
max_fallbacks=2,
checkpoint_ttl_seconds=900.0,
)
ALLOWED_STEPS_POLICY = {"payments_health", "demand_signal"}
ALLOWED_TOOLS_POLICY = {
"payments_primary_api",
"payments_replica_api",
"payments_cached_snapshot",
"demand_primary_api",
"demand_cached_snapshot",
}
ALLOWED_TOOLS_EXECUTION = ALLOWED_TOOLS_POLICY
def run_fallback_recovery_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
checkpoint = CheckpointStore()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = RecoveryGateway(
allowed_steps_policy=ALLOWED_STEPS_POLICY,
allowed_tools_policy=ALLOWED_TOOLS_POLICY,
allowed_tools_execution=ALLOWED_TOOLS_EXECUTION,
budget=BUDGET,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
try:
req = request["request"]
common_args = {
"report_date": req["report_date"],
"region": req["region"],
"request_id": run_id,
}
try:
payments_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="payments_health",
primary_tool_name="payments_primary_api",
primary_tool_fn=payments_primary_api,
fallback_chain=[
("payments_replica_api", payments_replica_api),
("payments_cached_snapshot", payments_cached_snapshot),
],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=300.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="payments_health")
trace.append(
{
"step": 1,
"phase": "payments_health",
"source": payments_step["source"],
"tool": payments_step["tool"],
"attempts_used": payments_step["attempts_used"],
"primary_attempts": payments_step["primary_attempts"],
"fallback_attempts": payments_step["fallback_attempts"],
"retried": payments_step["retried"],
"fallbacks_used": payments_step["fallbacks_used"],
"ok": payments_step["status"] == "done",
}
)
history.append(
{
"step": 1,
"action": "run_step_with_recovery",
"step_id": "payments_health",
"events": payments_step["events"],
"result": payments_step.get("result"),
}
)
try:
demand_step = gateway.run_step_with_recovery(
run_id=run_id,
step_id="demand_signal",
primary_tool_name="demand_primary_api",
primary_tool_fn=demand_primary_api,
fallback_chain=[("demand_cached_snapshot", demand_cached_snapshot)],
args=common_args,
checkpoint=checkpoint,
started_monotonic=started,
critical=True,
checkpoint_ttl_seconds=900.0,
)
except StopRun as exc:
return stopped(exc.reason, phase="demand_signal")
trace.append(
{
"step": 2,
"phase": "demand_signal",
"source": demand_step["source"],
"tool": demand_step["tool"],
"attempts_used": demand_step["attempts_used"],
"primary_attempts": demand_step["primary_attempts"],
"fallback_attempts": demand_step["fallback_attempts"],
"retried": demand_step["retried"],
"fallbacks_used": demand_step["fallbacks_used"],
"ok": demand_step["status"] == "done",
}
)
history.append(
{
"step": 2,
"action": "run_step_with_recovery",
"step_id": "demand_signal",
"events": demand_step["events"],
"result": demand_step.get("result"),
}
)
aggregate = {
"report_date": req["report_date"],
"region": req["region"],
"payments": payments_step["result"],
"demand": demand_step["result"],
}
fallback_steps = [
step["step_id"]
for step in (payments_step, demand_step)
if step["source"] == "fallback"
]
checkpoint_saved_steps = sorted(
set(item["step_id"] for item in checkpoint.dump_run(run_id=run_id))
)
checkpoint_resumed_steps = []
for h in history:
for ev in (h.get("events") or []):
if ev.get("kind") == "checkpoint_resume":
checkpoint_resumed_steps.append(ev.get("step_id"))
recovery_summary = {
"fallback_used": bool(fallback_steps),
"fallback_steps": fallback_steps,
"checkpoint_saved_steps": checkpoint_saved_steps,
"checkpoint_resumed_steps": sorted(set(checkpoint_resumed_steps)),
}
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="finalize")
try:
answer = compose_operations_brief(
goal=goal,
aggregate=aggregate,
recovery_summary=recovery_summary,
)
except LLMTimeout:
return stopped("llm_timeout", phase="finalize")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="finalize")
except LLMEmpty:
return stopped("llm_empty", phase="finalize")
trace.append(
{
"step": 3,
"phase": "finalize",
"fallback_used": recovery_summary["fallback_used"],
"ok": True,
}
)
history.append({"step": 3, "action": "compose_operations_brief"})
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback" if fallback_steps else "direct_success",
"answer": answer,
"aggregate": aggregate,
"recovery": recovery_summary,
"checkpoint": checkpoint.dump_run(run_id=run_id),
"trace": trace,
"history": history,
}
finally:
gateway.close()
def main() -> None:
result = run_fallback_recovery_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
- Retry і fallback контролюються policy-бюджетами, а не LLM.
_PRIMARY_ATTEMPTSуtools.py— demo-only мок; у реальному сервісі state має бути request-scoped.- Після success-кроку є checkpoint, щоб не втрачати прогрес.
- Метрики
primary_attempts/fallback_attemptsпоказують реальні спроби відновлення. checkpoint_saved_stepsіcheckpoint_resumed_stepsрозділяють збереження та фактичний resume.- У runnable коді
ThreadPoolExecutorзакривається черезgateway.close()(уfinally) для уникнення ресурсних витоків. trace/historyроблять recovery-поведінку прозорою.
Приклад виводу
{
"run_id": "af48d033-1f5a-4c3f-b31e-050d7777fd9a",
"status": "ok",
"stop_reason": "success",
"outcome": "recovered_with_fallback",
"answer": "A P1 incident is impacting US payments with a 3.4% failed payment rate and 5 chargeback alerts. Approximately 27% of checkouts are affected, primarily in the US enterprise segment, with an average of 182 orders per minute. Recovery has involved safe fallback to payments health tools; no checkpoint resumptions yet. Estimated recovery time is around 45 minutes but may change. Continue monitoring and prioritize customer safety in updates.",
"aggregate": {
"report_date": "2026-03-06",
"region": "US",
"payments": {
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"incident_severity": "P1",
"eta_minutes": 45,
"source": "payments_replica_api:US"
},
"demand": {
"affected_checkout_share": 0.27,
"avg_orders_per_minute": 182,
"priority_segment": "US_enterprise",
"source": "demand_primary_api:2026-03-06"
}
},
"recovery": {
"fallback_used": true,
"fallback_steps": [
"payments_health"
],
"checkpoint_saved_steps": [
"demand_signal",
"payments_health"
],
"checkpoint_resumed_steps": []
},
"checkpoint": [
{
"step_id": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"result_keys": [
"chargeback_alerts",
"eta_minutes",
"failed_payment_rate",
"incident_severity",
"source"
],
"saved_at": 1772475669.9678888
},
{
"step_id": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"result_keys": [
"affected_checkout_share",
"avg_orders_per_minute",
"priority_segment",
"source"
],
"saved_at": 1772475670.123298
}
],
"trace": [
{
"step": 1,
"phase": "payments_health",
"source": "fallback",
"tool": "payments_replica_api",
"attempts_used": 3,
"primary_attempts": 2,
"fallback_attempts": 1,
"retried": true,
"fallbacks_used": 1,
"ok": true
},
{
"step": 2,
"phase": "demand_signal",
"source": "primary",
"tool": "demand_primary_api",
"attempts_used": 1,
"primary_attempts": 1,
"fallback_attempts": 0,
"retried": false,
"fallbacks_used": 0,
"ok": true
},
{
"step": 3,
"phase": "finalize",
"fallback_used": true,
"ok": true
}
],
"history": [{...}]
}
Типові stop_reason
success— run завершено коректноmax_seconds— вичерпано загальний time budget runstep_not_allowed_policy:*— крок поза policy allowlisttool_denied_policy:*— tool заборонений policy allowlisttool_denied:*— tool заборонений runtime execution allowlistrecovery_exhausted:*— вичерпано retry/fallback для критичного крокуllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_empty— фінальний brief порожнійllm_invalid_json— LLM повернув невалідний JSONllm_invalid_schema— JSON не відповідає контракту
Що тут НЕ показано
- persisted checkpoint storage (Redis/DB)
- adaptive retry strategy (jitter, circuit breaker)
- tenant-specific execution allowlists
- human escalation queue для critical recovery failures
Що спробувати далі
- Заблокувати
payments_replica_apiуALLOWED_TOOLS_EXECUTIONі перевірити degrade/stop поведінку. - Зменшити
step_timeout_secondsі подивитися, як зміниться trace recovery. - Додати крок post-incident summary, який використовує checkpoint без повторного збору даних.