Research Agent на Python: повний приклад

Готовий до запуску приклад Research-агента на Python з bounded research workflow, policy boundary, структурною перевіркою нотаток і прозорими trace/history.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Код
  8. context.py — request envelope + policy hints
  9. tools.py — search/read/extract/verify інструменти
  10. agent.py — план + synthesis з notes
  11. gateway.py — policy/validation boundaries
  12. main.py — orchestrate bounded research workflow
  13. Приклад виводу
  14. Типові stop_reason
  15. Що тут НЕ показано
  16. Що спробувати далі

Суть патерна (коротко)

Research Agent означає: агент не відповідає "з голови", а проходить керований workflow:

  • search_sources
  • dedupe_urls
  • policy-check (runtime gate перед read_source, не окремий плановий step)
  • read_extract_notes
  • verify_notes
  • synthesize_answer

Тобто фінальна відповідь формується лише з перевірених нотаток, а цитування дозволені тільки через валідні citation id, що посилаються на note з provenance (url, quote, published_at).


Що демонструє цей приклад

  • Агент пропонує план дослідження, але runtime валідовує контракт і порядок кроків
  • URL нормалізуються і дедуплікуються до читання джерел
  • Policy allowlist і execution allowlist для доменів розділені
  • read_extract_notes працює в бюджетах (max_urls, max_read_pages, max_notes)
  • Нотатки мають provenance (url, title, published_at, quote)
  • verify_notes у цьому демо — мінімальний структурний quality-gate (не cross-source фактчек)
  • Synthesis дозволений тільки з валідними citation-id
  • trace/history дають аудит від search до фінального grounded answer

Архітектура

  1. agent.py формує план (search_sources → dedupe_urls → read_extract_notes → verify_notes → synthesize_answer).
  2. gateway.py валідує план, policy-рішення для джерел і контракти нотаток/відповіді.
  3. tools.py реалізує deterministic search_sources/read_source/extract_notes_from_page/verify_notes кроки.
  4. main.py оркеструє workflow, контролює бюджети й повертає trace/history.

Структура проєкту

TEXT
agent-patterns/
└── research-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/research-agent/python
python main.py

У цьому прикладі зовнішні залежності не потрібні.


Задача

Продакшен-кейс:

"Підготуй короткий статус по інциденту платежів у US і підтвердь enterprise SLA з явними цитуваннями."


Код

context.py — request envelope + policy hints

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "question": (
                "What is the current US payments incident status and what enterprise SLA "
                "commitments apply for uptime and P1 response time?"
            ),
        },
        "policy_hints": {
            "allowed_domains_policy": [
                "official-status.example.com",
                "vendor.example.com",
                "regulator.example.org",
            ],
            "allowed_domains_execution": [
                "official-status.example.com",
                "vendor.example.com",
            ],
            "max_urls": 6,
            "max_read_pages": 3,
            "max_notes": 6,
            "max_answer_chars": 850,
        },
    }

tools.py — search/read/extract/verify інструменти

PYTHON
from __future__ import annotations

from typing import Any
from urllib.parse import urlparse


RAW_SEARCH_RESULTS = [
    {
        "url": "https://official-status.example.com/incidents/payments-2026-03-07",
        "title": "Payments Incident Update",
        "snippet": "P1 incident, failed payment rate and ETA updates.",
        "score": 0.98,
    },
    {
        "url": "https://vendor.example.com/policies/enterprise-sla",
        "title": "Enterprise SLA",
        "snippet": "Uptime SLA and response targets by severity.",
        "score": 0.94,
    },
    {
        "url": "https://regulator.example.org/guidance/customer-communications",
        "title": "Customer Communication Guidance",
        "snippet": "Expectations for incident disclosures.",
        "score": 0.81,
    },
    {
        "url": "https://official-status.example.com/incidents/payments-2026-03-07#latest",
        "title": "Payments Incident Update (duplicate URL form)",
        "snippet": "Duplicate page with fragment.",
        "score": 0.73,
    },
    {
        "url": "https://vendor.example.com/policies/enterprise-sla?ref=search",
        "title": "Enterprise SLA (duplicate URL form)",
        "snippet": "Duplicate page with query string.",
        "score": 0.71,
    },
    {
        "url": "https://community-rumors.example.net/thread/payment-outage",
        "title": "Community Thread",
        "snippet": "Unverified forum claims.",
        "score": 0.42,
    },
]

PAGES: dict[str, dict[str, Any]] = {
    "https://official-status.example.com/incidents/payments-2026-03-07": {
        "title": "Payments Incident Update",
        "published_at": "2026-03-07",
        "body": (
            "US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. "
            "Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change."
        ),
    },
    "https://vendor.example.com/policies/enterprise-sla": {
        "title": "Enterprise SLA",
        "published_at": "2026-01-15",
        "body": (
            "Enterprise monthly uptime SLA is 99.95%. "
            "For P1 incidents, first response target is 15 minutes, available 24/7."
        ),
    },
    "https://regulator.example.org/guidance/customer-communications": {
        "title": "Customer Communication Guidance",
        "published_at": "2025-11-04",
        "body": (
            "Service providers should publish regular incident updates with known impact and recovery status."
        ),
    },
}


def normalize_url(url: str) -> str:
    parsed = urlparse(str(url).strip())
    scheme = (parsed.scheme or "https").lower()
    host = parsed.netloc.lower()
    path = parsed.path or "/"
    if path != "/" and path.endswith("/"):
        path = path[:-1]
    return f"{scheme}://{host}{path}"


def search_sources(*, query: str, k: int) -> dict[str, Any]:
    del query
    return {
        "status": "ok",
        "data": {
            "results": [dict(item) for item in RAW_SEARCH_RESULTS[: max(1, int(k))]],
        },
    }


def read_source(*, url: str) -> dict[str, Any]:
    normalized = normalize_url(url)
    page = PAGES.get(normalized)
    if page is None:
        return {
            "status": "error",
            "error": "not_found",
        }
    return {
        "status": "ok",
        "data": {
            "url": normalized,
            "title": str(page["title"]),
            "published_at": str(page["published_at"]),
            "body": str(page["body"]),
        },
    }


def extract_notes_from_page(*, url: str, page: dict[str, Any]) -> list[dict[str, Any]]:
    normalized = normalize_url(url)

    if normalized == "https://official-status.example.com/incidents/payments-2026-03-07":
        return [
            {
                "claim": "US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes.",
                "quote": "US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change.",
                "url": normalized,
                "title": page["title"],
                "published_at": page["published_at"],
            }
        ]

    if normalized == "https://vendor.example.com/policies/enterprise-sla":
        return [
            {
                "claim": "Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7).",
                "quote": "Enterprise monthly uptime SLA is 99.95%. For P1 incidents, first response target is 15 minutes, available 24/7.",
                "url": normalized,
                "title": page["title"],
                "published_at": page["published_at"],
            }
        ]

    return []


def verify_notes(*, notes: list[dict[str, Any]]) -> dict[str, Any]:
    checked = 0
    issues: list[str] = []

    for note in notes:
        checked += 1
        quote = str(note.get("quote", "")).strip()
        claim = str(note.get("claim", "")).strip()
        if len(quote) < 20:
            issues.append("quote_too_short")
        if not claim:
            issues.append("claim_missing")

    return {
        "status": "ok",
        "data": {
            "ok": len(issues) == 0,
            "checked_notes": checked,
            "issues": issues,
        },
    }

agent.py — план + synthesis з notes

PYTHON
from __future__ import annotations

from typing import Any


def propose_research_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    query = request["request"]["question"]
    return {
        "steps": [
            {
                "id": "r1",
                "action": "search_sources",
                "args": {
                    "query": query,
                },
            },
            {"id": "r2", "action": "dedupe_urls", "args": {}},
            {"id": "r3", "action": "read_extract_notes", "args": {}},
            {"id": "r4", "action": "verify_notes", "args": {}},
            {"id": "r5", "action": "synthesize_answer", "args": {}},
        ]
    }


def synthesize_from_notes(*, goal: str, notes: list[dict[str, Any]]) -> dict[str, Any]:
    del goal
    if not notes:
        return {
            "answer": "",
            "citations": [],
        }

    selected = notes[:3]
    citations = [str(item["id"]) for item in selected]

    claims = [str(item["claim"]).strip() for item in selected]
    answer = (
        "Research brief: "
        + " ".join(claims)
        + " Timeline values are estimates and may change."
    )

    return {
        "answer": answer,
        "citations": citations,
    }

gateway.py — policy/validation boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_urls: int = 6
    max_read_pages: int = 3
    max_notes: int = 6
    max_answer_chars: int = 850


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "search_sources",
    "dedupe_urls",
    "read_extract_notes",
    "verify_notes",
    "synthesize_answer",
]


def normalize_url(url: str) -> str:
    parsed = urlparse(str(url).strip())
    scheme = (parsed.scheme or "https").lower()
    host = parsed.netloc.lower()
    path = parsed.path or "/"
    if path != "/" and path.endswith("/"):
        path = path[:-1]
    return f"{scheme}://{host}{path}"


def get_domain(url: str) -> str:
    return urlparse(str(url).strip()).netloc.lower()


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    out: list[dict[str, Any]] = []
    actions: list[str] = []

    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")

        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        out.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return out


def dedupe_urls(*, raw_urls: list[str], max_urls: int) -> list[str]:
    seen: set[str] = set()
    out: list[str] = []
    for raw in raw_urls:
        normalized = normalize_url(raw)
        if normalized in seen:
            continue
        seen.add(normalized)
        out.append(normalized)
        if len(out) >= max_urls:
            break
    return out


class ResearchGateway:
    def __init__(
        self,
        *,
        allowed_domains_policy: set[str],
        allowed_domains_execution: set[str],
        budget: Budget,
    ):
        self.allowed_domains_policy = {d.lower() for d in allowed_domains_policy}
        self.allowed_domains_execution = {d.lower() for d in allowed_domains_execution}
        self.budget = budget

    def evaluate_source(self, *, url: str) -> Decision:
        domain = get_domain(url)
        if domain not in self.allowed_domains_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if domain not in self.allowed_domains_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def validate_notes(self, *, notes: list[dict[str, Any]]) -> None:
        if not isinstance(notes, list) or not notes:
            raise StopRun("invalid_notes:empty")
        if len(notes) > self.budget.max_notes:
            raise StopRun("invalid_notes:too_many")

        for note in notes:
            if not isinstance(note, dict):
                raise StopRun("invalid_notes:item")
            if not isinstance(note.get("id"), str) or not note["id"].strip():
                raise StopRun("invalid_notes:id")
            if not isinstance(note.get("url"), str) or not note["url"].strip():
                raise StopRun("invalid_notes:url")
            if not isinstance(note.get("claim"), str) or not note["claim"].strip():
                raise StopRun("invalid_notes:claim")
            quote = note.get("quote")
            if not isinstance(quote, str) or len(quote.strip()) < 20:
                raise StopRun("invalid_notes:quote")

    def validate_synthesis(self, *, answer: str, citations: list[str], notes: list[dict[str, Any]]) -> None:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")
        if len(answer) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")

        if not isinstance(citations, list) or not citations:
            raise StopRun("invalid_answer:citations")

        note_ids = {str(item["id"]) for item in notes}
        for citation in citations:
            if str(citation) not in note_ids:
                raise StopRun("invalid_answer:citation_unknown")

main.py — orchestrate bounded research workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import propose_research_plan, synthesize_from_notes
from context import build_request
from gateway import Budget, ResearchGateway, StopRun, dedupe_urls, validate_plan
from tools import extract_notes_from_page, read_source, search_sources, verify_notes

GOAL = (
    "Research current US payments incident status and enterprise SLA commitments, "
    "then return a concise grounded summary with citations."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_urls=6,
    max_read_pages=3,
    max_notes=6,
    max_answer_chars=850,
)


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def _safe_int(value: Any, *, default: int) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return int(default)


def run_research_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_domains_policy_raw = hints.get("allowed_domains_policy")
    if isinstance(allowed_domains_policy_raw, list):
        allowed_domains_policy = {
            str(item).strip().lower()
            for item in allowed_domains_policy_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_domains_policy = {
            "official-status.example.com",
            "vendor.example.com",
            "regulator.example.org",
        }

    allowed_domains_execution_raw = hints.get("allowed_domains_execution")
    if isinstance(allowed_domains_execution_raw, list):
        allowed_domains_execution = {
            str(item).strip().lower()
            for item in allowed_domains_execution_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_domains_execution = {
            "official-status.example.com",
            "vendor.example.com",
        }

    max_urls = _safe_int(hints.get("max_urls", DEFAULT_BUDGET.max_urls), default=DEFAULT_BUDGET.max_urls)
    max_read_pages = _safe_int(hints.get("max_read_pages", DEFAULT_BUDGET.max_read_pages), default=DEFAULT_BUDGET.max_read_pages)
    max_notes = _safe_int(hints.get("max_notes", DEFAULT_BUDGET.max_notes), default=DEFAULT_BUDGET.max_notes)
    max_answer_chars = _safe_int(hints.get("max_answer_chars", DEFAULT_BUDGET.max_answer_chars), default=DEFAULT_BUDGET.max_answer_chars)

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_urls=max(1, min(20, max_urls)),
        max_read_pages=max(1, min(10, max_read_pages)),
        max_notes=max(1, min(20, max_notes)),
        max_answer_chars=max(120, min(2000, max_answer_chars)),
    )

    gateway = ResearchGateway(
        allowed_domains_policy=allowed_domains_policy,
        allowed_domains_execution=allowed_domains_execution,
        budget=budget,
    )

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_research_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_research_plan",
                "step_ids": [step["id"] for step in steps],
            }
        )

        phase = "search"
        query = str(steps[0]["args"].get("query", "")).strip()
        if not query:
            return stopped("invalid_search:query", phase=phase)

        search_data = _unwrap_tool_data(
            search_sources(query=query, k=budget.max_urls * 2),
            tool_name="search_sources",
        )
        search_results = list(search_data.get("results", []))
        candidate_urls = [str(item.get("url", "")).strip() for item in search_results if isinstance(item, dict)]

        trace.append(
            {
                "step": 2,
                "phase": "search",
                "query": query,
                "candidates": len(candidate_urls),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "search_sources",
                "query": query,
                "candidates": len(candidate_urls),
            }
        )

        phase = "dedupe"
        deduped_urls = dedupe_urls(raw_urls=candidate_urls, max_urls=budget.max_urls)
        if not deduped_urls:
            return stopped("no_sources_after_dedupe", phase=phase)

        trace.append(
            {
                "step": 3,
                "phase": "dedupe",
                "urls_after_dedupe": len(deduped_urls),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "dedupe_urls",
                "urls_after_dedupe": len(deduped_urls),
            }
        )

        phase = "read_extract"
        notes: list[dict[str, Any]] = []
        read_urls: list[str] = []
        denied_sources: list[dict[str, str]] = []

        for url in deduped_urls:
            if (time.monotonic() - started) > budget.max_seconds:
                return stopped("max_seconds", phase=phase)

            decision = gateway.evaluate_source(url=url)
            if decision.kind != "allow":
                denied_sources.append({"url": url, "reason": decision.reason})
                continue

            if len(read_urls) >= budget.max_read_pages:
                break

            page = _unwrap_tool_data(
                read_source(url=url),
                tool_name="read_source",
            )

            extracted = extract_notes_from_page(url=url, page=page)
            for item in extracted:
                note = dict(item)
                note["id"] = f"n{len(notes) + 1}"
                notes.append(note)
                if len(notes) >= budget.max_notes:
                    break

            read_urls.append(url)
            if len(notes) >= budget.max_notes:
                break

        if not notes:
            return stopped(
                "no_reliable_sources",
                phase=phase,
                denied_sources=denied_sources,
            )

        gateway.validate_notes(notes=notes)

        trace.append(
            {
                "step": 4,
                "phase": "read_extract",
                "pages_read": len(read_urls),
                "notes": len(notes),
                "denied_sources": len(denied_sources),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "read_extract_notes",
                "pages_read": len(read_urls),
                "denied_sources": denied_sources,
            }
        )

        phase = "verify"
        verification = _unwrap_tool_data(
            verify_notes(notes=notes),
            tool_name="verify_notes",
        )
        if not bool(verification.get("ok")):
            issues = verification.get("issues") or []
            first = str(issues[0]) if issues else "unknown"
            return stopped(f"verification_failed:{first}", phase=phase, verification=verification)

        trace.append(
            {
                "step": 5,
                "phase": "verify",
                "checked_notes": int(verification.get("checked_notes", 0)),
                "issues": len(verification.get("issues", [])),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "verify_notes",
                "checked_notes": int(verification.get("checked_notes", 0)),
            }
        )

        phase = "synthesize"
        synthesis = synthesize_from_notes(goal=goal, notes=notes)
        answer = str(synthesis.get("answer", "")).strip()
        citations = [str(item).strip() for item in synthesis.get("citations", []) if str(item).strip()]

        gateway.validate_synthesis(answer=answer, citations=citations, notes=notes)

        aggregate = {
            "query": query,
            "urls_found": len(candidate_urls),
            "urls_after_dedupe": len(deduped_urls),
            "pages_read": len(read_urls),
            "notes_count": len(notes),
            "citations_count": len(citations),
            "denied_sources": denied_sources,
            "verified_notes": int(verification.get("checked_notes", 0)),
        }

        trace.append(
            {
                "step": 6,
                "phase": "synthesize",
                "answer_chars": len(answer),
                "citations": len(citations),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "synthesize_answer",
                "citations": citations,
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "grounded_research_answer",
            "answer": answer,
            "citations": citations,
            "citation_details": [
                {
                    "id": str(note["id"]),
                    "url": str(note["url"]),
                    "title": str(note["title"]),
                    "published_at": str(note["published_at"]),
                }
                for note in notes
                if str(note["id"]) in set(citations)
            ],
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_research_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • search_sources не дає права одразу писати відповідь; спочатку йдуть dedupe_urls, policy-check (runtime gate), read_extract_notes, verify_notes
  • Execution allowlist може блокувати домен навіть якщо policy allowlist його дозволяє
  • no_reliable_sources спрацьовує, якщо після policy/read не лишилось валідних notes
  • verify_notes у цьому прикладі не підтверджує істинність claim між незалежними джерелами; це мінімальний structural quality-gate для notes
  • Writer працює тільки з notes, а citation id мають посилатися на наявні note; вигадані citation-id блокує invalid_answer:citation_unknown
  • pages_read і notes_count — різні сигнали: сторінка може бути прочитана, але не дати жодної note після extract_notes_from_page
  • trace/history показують, скільки URL знайдено, скільки відхилено policy і скільки notes реально перевірено

Приклад виводу

JSON
{
  "run_id": "08edca9e-6972-493c-98d5-11b53d0d1fc2",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_research_answer",
  "answer": "Research brief: US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes. Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7). Timeline values are estimates and may change.",
  "citations": [
    "n1",
    "n2"
  ],
  "citation_details": [
    {
      "id": "n1",
      "url": "https://official-status.example.com/incidents/payments-2026-03-07",
      "title": "Payments Incident Update",
      "published_at": "2026-03-07"
    },
    {
      "id": "n2",
      "url": "https://vendor.example.com/policies/enterprise-sla",
      "title": "Enterprise SLA",
      "published_at": "2026-01-15"
    }
  ],
  "aggregate": {
    "query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
    "urls_found": 6,
    "urls_after_dedupe": 4,
    "pages_read": 2,
    "notes_count": 2,
    "citations_count": 2,
    "denied_sources": [
      {
        "url": "https://regulator.example.org/guidance/customer-communications",
        "reason": "source_denied_execution"
      },
      {
        "url": "https://community-rumors.example.net/thread/payment-outage",
        "reason": "source_denied_policy"
      }
    ],
    "verified_notes": 2
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "search",
      "query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
      "candidates": 6,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "dedupe",
      "urls_after_dedupe": 4,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 4,
      "phase": "read_extract",
      "pages_read": 2,
      "notes": 2,
      "denied_sources": 2,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 5,
      "phase": "verify",
      "checked_notes": 2,
      "issues": 0,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 6,
      "phase": "synthesize",
      "answer_chars": 275,
      "citations": 2,
      "elapsed_ms": 1,
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • max_seconds — вичерпано загальний time budget
  • invalid_plan:* — невалідний план дослідження
  • invalid_step:* — невалідний step-контракт
  • invalid_search:query — порожній query для пошуку
  • no_sources_after_dedupe — після дедуплікації не лишилось URL
  • no_reliable_sources — не отримано жодної валідної note після policy/read
  • tool_invalid_output:* — tool повернув невалідний контракт
  • verification_failed:* — verify-крок виявив критичну проблему в notes
  • invalid_notes:* — notes не пройшли gateway-валидацію
  • invalid_answer:* — synthesis не пройшов контракт (порожня відповідь, завелика довжина, невалідні citations)
  • source_denied_policy — типова причина відсіву джерела у denied_sources (не окремий stop_reason)
  • source_denied_execution — типова причина відсіву джерела у denied_sources (не окремий stop_reason)

Що тут НЕ показано

  • реальний web-fetch/HTTP клієнт (тут використано deterministic mocked tools)
  • cache TTL, retry policy, і backoff для нестабільних джерел
  • trust scoring і weighted source ranking
  • claim-level cross-check між кількома незалежними джерелами
  • human-in-the-loop ревʼю для high-risk тем

Що спробувати далі

  1. Прибрати vendor.example.com з execution allowlist і подивитись, як зміниться denied_sources та результат synthesis.
  2. Зменшити max_read_pages до 1 і перевірити, як це вплине на citations_count.
  3. Підмінити citations у synthesis на неіснуючий id і подивитись invalid_answer:citation_unknown.
  4. Додати "ламану" note з короткою quote і подивитись invalid_notes:quote або verification_failed:*.
⏱️ 15 хв читанняОновлено 6 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.