Agent Research en Python : Exemple complet

Exemple runnable production-style d un agent Research en Python avec bounded research workflow, policy boundary, verification structurelle des notes et trace/history transparents.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce Que Cet Exemple Demontre
  3. Architecture
  4. Structure du Projet
  5. Comment Executer
  6. Tache
  7. Code
  8. context.py — request envelope + policy hints
  9. tools.py — outils search/read/extract/verify
  10. agent.py — plan + synthesis depuis notes
  11. gateway.py — policy/validation boundaries
  12. main.py — orchestrate bounded research workflow
  13. Exemple de Sortie
  14. stop_reason Typiques
  15. Ce Qui N est PAS Montre Ici
  16. Ce Qu il Faut Essayer Ensuite

Essence du pattern (bref)

Research Agent signifie : l agent ne repond pas "de tete", il suit un workflow controle :

  • search_sources
  • dedupe_urls
  • policy-check (runtime gate avant read_source, pas un step de plan separe)
  • read_extract_notes
  • verify_notes
  • synthesize_answer

Donc la reponse finale est construite uniquement a partir de notes verifiees, et les citations sont autorisees seulement via des citation id valides pointant vers une note avec provenance (url, quote, published_at).


Ce Que Cet Exemple Demontre

  • L agent propose un plan de recherche, mais runtime valide le contrat du plan et l ordre des etapes
  • Les URL sont normalisees et dedupliquees avant la lecture des sources
  • Policy allowlist et execution allowlist pour les domaines sont separees
  • read_extract_notes fonctionne avec des budgets (max_urls, max_read_pages, max_notes)
  • Les notes ont une provenance (url, title, published_at, quote)
  • verify_notes dans cette demo est un quality-gate structurel minimal (pas de fact-check cross-source)
  • La synthesis est autorisee uniquement avec des citation-id valides
  • trace/history donnent une auditabilite de search jusqu a la grounded answer finale

Architecture

  1. agent.py forme le plan (search_sources → dedupe_urls → read_extract_notes → verify_notes → synthesize_answer).
  2. gateway.py valide plan, decisions policy pour les sources et contrats note/reponse.
  3. tools.py implemente des etapes deterministic search_sources/read_source/extract_notes_from_page/verify_notes.
  4. main.py orchestre le workflow, controle les budgets et renvoie trace/history.

Structure du Projet

TEXT
agent-patterns/
└── research-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Comment Executer

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/research-agent/python
python main.py

Cet exemple ne requiert aucune dependance externe.


Tache

Cas production :

"Prepare un court statut sur un incident de paiement US et confirme enterprise SLA avec des citations explicites."


Code

context.py — request envelope + policy hints

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "question": (
                "What is the current US payments incident status and what enterprise SLA "
                "commitments apply for uptime and P1 response time?"
            ),
        },
        "policy_hints": {
            "allowed_domains_policy": [
                "official-status.example.com",
                "vendor.example.com",
                "regulator.example.org",
            ],
            "allowed_domains_execution": [
                "official-status.example.com",
                "vendor.example.com",
            ],
            "max_urls": 6,
            "max_read_pages": 3,
            "max_notes": 6,
            "max_answer_chars": 850,
        },
    }

tools.py — outils search/read/extract/verify

PYTHON
from __future__ import annotations

from typing import Any
from urllib.parse import urlparse


RAW_SEARCH_RESULTS = [
    {
        "url": "https://official-status.example.com/incidents/payments-2026-03-07",
        "title": "Payments Incident Update",
        "snippet": "P1 incident, failed payment rate and ETA updates.",
        "score": 0.98,
    },
    {
        "url": "https://vendor.example.com/policies/enterprise-sla",
        "title": "Enterprise SLA",
        "snippet": "Uptime SLA and response targets by severity.",
        "score": 0.94,
    },
    {
        "url": "https://regulator.example.org/guidance/customer-communications",
        "title": "Customer Communication Guidance",
        "snippet": "Expectations for incident disclosures.",
        "score": 0.81,
    },
    {
        "url": "https://official-status.example.com/incidents/payments-2026-03-07#latest",
        "title": "Payments Incident Update (duplicate URL form)",
        "snippet": "Duplicate page with fragment.",
        "score": 0.73,
    },
    {
        "url": "https://vendor.example.com/policies/enterprise-sla?ref=search",
        "title": "Enterprise SLA (duplicate URL form)",
        "snippet": "Duplicate page with query string.",
        "score": 0.71,
    },
    {
        "url": "https://community-rumors.example.net/thread/payment-outage",
        "title": "Community Thread",
        "snippet": "Unverified forum claims.",
        "score": 0.42,
    },
]

PAGES: dict[str, dict[str, Any]] = {
    "https://official-status.example.com/incidents/payments-2026-03-07": {
        "title": "Payments Incident Update",
        "published_at": "2026-03-07",
        "body": (
            "US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. "
            "Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change."
        ),
    },
    "https://vendor.example.com/policies/enterprise-sla": {
        "title": "Enterprise SLA",
        "published_at": "2026-01-15",
        "body": (
            "Enterprise monthly uptime SLA is 99.95%. "
            "For P1 incidents, first response target is 15 minutes, available 24/7."
        ),
    },
    "https://regulator.example.org/guidance/customer-communications": {
        "title": "Customer Communication Guidance",
        "published_at": "2025-11-04",
        "body": (
            "Service providers should publish regular incident updates with known impact and recovery status."
        ),
    },
}


def normalize_url(url: str) -> str:
    parsed = urlparse(str(url).strip())
    scheme = (parsed.scheme or "https").lower()
    host = parsed.netloc.lower()
    path = parsed.path or "/"
    if path != "/" and path.endswith("/"):
        path = path[:-1]
    return f"{scheme}://{host}{path}"


def search_sources(*, query: str, k: int) -> dict[str, Any]:
    del query
    return {
        "status": "ok",
        "data": {
            "results": [dict(item) for item in RAW_SEARCH_RESULTS[: max(1, int(k))]],
        },
    }


def read_source(*, url: str) -> dict[str, Any]:
    normalized = normalize_url(url)
    page = PAGES.get(normalized)
    if page is None:
        return {
            "status": "error",
            "error": "not_found",
        }
    return {
        "status": "ok",
        "data": {
            "url": normalized,
            "title": str(page["title"]),
            "published_at": str(page["published_at"]),
            "body": str(page["body"]),
        },
    }


def extract_notes_from_page(*, url: str, page: dict[str, Any]) -> list[dict[str, Any]]:
    normalized = normalize_url(url)

    if normalized == "https://official-status.example.com/incidents/payments-2026-03-07":
        return [
            {
                "claim": "US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes.",
                "quote": "US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change.",
                "url": normalized,
                "title": page["title"],
                "published_at": page["published_at"],
            }
        ]

    if normalized == "https://vendor.example.com/policies/enterprise-sla":
        return [
            {
                "claim": "Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7).",
                "quote": "Enterprise monthly uptime SLA is 99.95%. For P1 incidents, first response target is 15 minutes, available 24/7.",
                "url": normalized,
                "title": page["title"],
                "published_at": page["published_at"],
            }
        ]

    return []


def verify_notes(*, notes: list[dict[str, Any]]) -> dict[str, Any]:
    checked = 0
    issues: list[str] = []

    for note in notes:
        checked += 1
        quote = str(note.get("quote", "")).strip()
        claim = str(note.get("claim", "")).strip()
        if len(quote) < 20:
            issues.append("quote_too_short")
        if not claim:
            issues.append("claim_missing")

    return {
        "status": "ok",
        "data": {
            "ok": len(issues) == 0,
            "checked_notes": checked,
            "issues": issues,
        },
    }

agent.py — plan + synthesis depuis notes

PYTHON
from __future__ import annotations

from typing import Any


def propose_research_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    query = request["request"]["question"]
    return {
        "steps": [
            {
                "id": "r1",
                "action": "search_sources",
                "args": {
                    "query": query,
                },
            },
            {"id": "r2", "action": "dedupe_urls", "args": {}},
            {"id": "r3", "action": "read_extract_notes", "args": {}},
            {"id": "r4", "action": "verify_notes", "args": {}},
            {"id": "r5", "action": "synthesize_answer", "args": {}},
        ]
    }


def synthesize_from_notes(*, goal: str, notes: list[dict[str, Any]]) -> dict[str, Any]:
    del goal
    if not notes:
        return {
            "answer": "",
            "citations": [],
        }

    selected = notes[:3]
    citations = [str(item["id"]) for item in selected]

    claims = [str(item["claim"]).strip() for item in selected]
    answer = (
        "Research brief: "
        + " ".join(claims)
        + " Timeline values are estimates and may change."
    )

    return {
        "answer": answer,
        "citations": citations,
    }

gateway.py — policy/validation boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_urls: int = 6
    max_read_pages: int = 3
    max_notes: int = 6
    max_answer_chars: int = 850


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "search_sources",
    "dedupe_urls",
    "read_extract_notes",
    "verify_notes",
    "synthesize_answer",
]


def normalize_url(url: str) -> str:
    parsed = urlparse(str(url).strip())
    scheme = (parsed.scheme or "https").lower()
    host = parsed.netloc.lower()
    path = parsed.path or "/"
    if path != "/" and path.endswith("/"):
        path = path[:-1]
    return f"{scheme}://{host}{path}"


def get_domain(url: str) -> str:
    return urlparse(str(url).strip()).netloc.lower()


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    out: list[dict[str, Any]] = []
    actions: list[str] = []

    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")

        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        out.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return out


def dedupe_urls(*, raw_urls: list[str], max_urls: int) -> list[str]:
    seen: set[str] = set()
    out: list[str] = []
    for raw in raw_urls:
        normalized = normalize_url(raw)
        if normalized in seen:
            continue
        seen.add(normalized)
        out.append(normalized)
        if len(out) >= max_urls:
            break
    return out


class ResearchGateway:
    def __init__(
        self,
        *,
        allowed_domains_policy: set[str],
        allowed_domains_execution: set[str],
        budget: Budget,
    ):
        self.allowed_domains_policy = {d.lower() for d in allowed_domains_policy}
        self.allowed_domains_execution = {d.lower() for d in allowed_domains_execution}
        self.budget = budget

    def evaluate_source(self, *, url: str) -> Decision:
        domain = get_domain(url)
        if domain not in self.allowed_domains_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if domain not in self.allowed_domains_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def validate_notes(self, *, notes: list[dict[str, Any]]) -> None:
        if not isinstance(notes, list) or not notes:
            raise StopRun("invalid_notes:empty")
        if len(notes) > self.budget.max_notes:
            raise StopRun("invalid_notes:too_many")

        for note in notes:
            if not isinstance(note, dict):
                raise StopRun("invalid_notes:item")
            if not isinstance(note.get("id"), str) or not note["id"].strip():
                raise StopRun("invalid_notes:id")
            if not isinstance(note.get("url"), str) or not note["url"].strip():
                raise StopRun("invalid_notes:url")
            if not isinstance(note.get("claim"), str) or not note["claim"].strip():
                raise StopRun("invalid_notes:claim")
            quote = note.get("quote")
            if not isinstance(quote, str) or len(quote.strip()) < 20:
                raise StopRun("invalid_notes:quote")

    def validate_synthesis(self, *, answer: str, citations: list[str], notes: list[dict[str, Any]]) -> None:
        if not isinstance(answer, str) or not answer.strip():
            raise StopRun("invalid_answer:empty")
        if len(answer) > self.budget.max_answer_chars:
            raise StopRun("invalid_answer:too_long")

        if not isinstance(citations, list) or not citations:
            raise StopRun("invalid_answer:citations")

        note_ids = {str(item["id"]) for item in notes}
        for citation in citations:
            if str(citation) not in note_ids:
                raise StopRun("invalid_answer:citation_unknown")

main.py — orchestrate bounded research workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import propose_research_plan, synthesize_from_notes
from context import build_request
from gateway import Budget, ResearchGateway, StopRun, dedupe_urls, validate_plan
from tools import extract_notes_from_page, read_source, search_sources, verify_notes

GOAL = (
    "Research current US payments incident status and enterprise SLA commitments, "
    "then return a concise grounded summary with citations."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_urls=6,
    max_read_pages=3,
    max_notes=6,
    max_answer_chars=850,
)


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def _safe_int(value: Any, *, default: int) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return int(default)


def run_research_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_domains_policy_raw = hints.get("allowed_domains_policy")
    if isinstance(allowed_domains_policy_raw, list):
        allowed_domains_policy = {
            str(item).strip().lower()
            for item in allowed_domains_policy_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_domains_policy = {
            "official-status.example.com",
            "vendor.example.com",
            "regulator.example.org",
        }

    allowed_domains_execution_raw = hints.get("allowed_domains_execution")
    if isinstance(allowed_domains_execution_raw, list):
        allowed_domains_execution = {
            str(item).strip().lower()
            for item in allowed_domains_execution_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_domains_execution = {
            "official-status.example.com",
            "vendor.example.com",
        }

    max_urls = _safe_int(hints.get("max_urls", DEFAULT_BUDGET.max_urls), default=DEFAULT_BUDGET.max_urls)
    max_read_pages = _safe_int(hints.get("max_read_pages", DEFAULT_BUDGET.max_read_pages), default=DEFAULT_BUDGET.max_read_pages)
    max_notes = _safe_int(hints.get("max_notes", DEFAULT_BUDGET.max_notes), default=DEFAULT_BUDGET.max_notes)
    max_answer_chars = _safe_int(hints.get("max_answer_chars", DEFAULT_BUDGET.max_answer_chars), default=DEFAULT_BUDGET.max_answer_chars)

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_urls=max(1, min(20, max_urls)),
        max_read_pages=max(1, min(10, max_read_pages)),
        max_notes=max(1, min(20, max_notes)),
        max_answer_chars=max(120, min(2000, max_answer_chars)),
    )

    gateway = ResearchGateway(
        allowed_domains_policy=allowed_domains_policy,
        allowed_domains_execution=allowed_domains_execution,
        budget=budget,
    )

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_research_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_research_plan",
                "step_ids": [step["id"] for step in steps],
            }
        )

        phase = "search"
        query = str(steps[0]["args"].get("query", "")).strip()
        if not query:
            return stopped("invalid_search:query", phase=phase)

        search_data = _unwrap_tool_data(
            search_sources(query=query, k=budget.max_urls * 2),
            tool_name="search_sources",
        )
        search_results = list(search_data.get("results", []))
        candidate_urls = [str(item.get("url", "")).strip() for item in search_results if isinstance(item, dict)]

        trace.append(
            {
                "step": 2,
                "phase": "search",
                "query": query,
                "candidates": len(candidate_urls),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "search_sources",
                "query": query,
                "candidates": len(candidate_urls),
            }
        )

        phase = "dedupe"
        deduped_urls = dedupe_urls(raw_urls=candidate_urls, max_urls=budget.max_urls)
        if not deduped_urls:
            return stopped("no_sources_after_dedupe", phase=phase)

        trace.append(
            {
                "step": 3,
                "phase": "dedupe",
                "urls_after_dedupe": len(deduped_urls),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "dedupe_urls",
                "urls_after_dedupe": len(deduped_urls),
            }
        )

        phase = "read_extract"
        notes: list[dict[str, Any]] = []
        read_urls: list[str] = []
        denied_sources: list[dict[str, str]] = []

        for url in deduped_urls:
            if (time.monotonic() - started) > budget.max_seconds:
                return stopped("max_seconds", phase=phase)

            decision = gateway.evaluate_source(url=url)
            if decision.kind != "allow":
                denied_sources.append({"url": url, "reason": decision.reason})
                continue

            if len(read_urls) >= budget.max_read_pages:
                break

            page = _unwrap_tool_data(
                read_source(url=url),
                tool_name="read_source",
            )

            extracted = extract_notes_from_page(url=url, page=page)
            for item in extracted:
                note = dict(item)
                note["id"] = f"n{len(notes) + 1}"
                notes.append(note)
                if len(notes) >= budget.max_notes:
                    break

            read_urls.append(url)
            if len(notes) >= budget.max_notes:
                break

        if not notes:
            return stopped(
                "no_reliable_sources",
                phase=phase,
                denied_sources=denied_sources,
            )

        gateway.validate_notes(notes=notes)

        trace.append(
            {
                "step": 4,
                "phase": "read_extract",
                "pages_read": len(read_urls),
                "notes": len(notes),
                "denied_sources": len(denied_sources),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "read_extract_notes",
                "pages_read": len(read_urls),
                "denied_sources": denied_sources,
            }
        )

        phase = "verify"
        verification = _unwrap_tool_data(
            verify_notes(notes=notes),
            tool_name="verify_notes",
        )
        if not bool(verification.get("ok")):
            issues = verification.get("issues") or []
            first = str(issues[0]) if issues else "unknown"
            return stopped(f"verification_failed:{first}", phase=phase, verification=verification)

        trace.append(
            {
                "step": 5,
                "phase": "verify",
                "checked_notes": int(verification.get("checked_notes", 0)),
                "issues": len(verification.get("issues", [])),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "verify_notes",
                "checked_notes": int(verification.get("checked_notes", 0)),
            }
        )

        phase = "synthesize"
        synthesis = synthesize_from_notes(goal=goal, notes=notes)
        answer = str(synthesis.get("answer", "")).strip()
        citations = [str(item).strip() for item in synthesis.get("citations", []) if str(item).strip()]

        gateway.validate_synthesis(answer=answer, citations=citations, notes=notes)

        aggregate = {
            "query": query,
            "urls_found": len(candidate_urls),
            "urls_after_dedupe": len(deduped_urls),
            "pages_read": len(read_urls),
            "notes_count": len(notes),
            "citations_count": len(citations),
            "denied_sources": denied_sources,
            "verified_notes": int(verification.get("checked_notes", 0)),
        }

        trace.append(
            {
                "step": 6,
                "phase": "synthesize",
                "answer_chars": len(answer),
                "citations": len(citations),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "synthesize_answer",
                "citations": citations,
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "grounded_research_answer",
            "answer": answer,
            "citations": citations,
            "citation_details": [
                {
                    "id": str(note["id"]),
                    "url": str(note["url"]),
                    "title": str(note["title"]),
                    "published_at": str(note["published_at"]),
                }
                for note in notes
                if str(note["id"]) in set(citations)
            ],
            "aggregate": aggregate,
            "trace": trace,
            "history": history,
        }

    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_research_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en mots simples)

  • search_sources ne donne pas le droit d ecrire la reponse tout de suite ; d abord viennent dedupe_urls, policy-check (runtime gate), read_extract_notes, verify_notes
  • Execution allowlist peut bloquer un domaine meme si policy allowlist l autorise
  • no_reliable_sources se declenche si, apres policy/read, il ne reste aucune note valide
  • verify_notes dans cet exemple ne prouve pas la verite d un claim entre sources independantes ; c est un quality-gate structurel minimal pour les notes
  • Writer travaille uniquement avec notes, et les citation id doivent pointer vers une note existante ; les citation-id inventes sont bloques par invalid_answer:citation_unknown
  • pages_read et notes_count sont des signaux differents : une page peut etre lue sans produire de note apres extract_notes_from_page
  • trace/history montrent combien d URL ont ete trouvees, combien ont ete refusees par policy, et combien de notes ont reellement ete verifiees

Exemple de Sortie

JSON
{
  "run_id": "08edca9e-6972-493c-98d5-11b53d0d1fc2",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_research_answer",
  "answer": "Research brief: US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes. Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7). Timeline values are estimates and may change.",
  "citations": [
    "n1",
    "n2"
  ],
  "citation_details": [
    {
      "id": "n1",
      "url": "https://official-status.example.com/incidents/payments-2026-03-07",
      "title": "Payments Incident Update",
      "published_at": "2026-03-07"
    },
    {
      "id": "n2",
      "url": "https://vendor.example.com/policies/enterprise-sla",
      "title": "Enterprise SLA",
      "published_at": "2026-01-15"
    }
  ],
  "aggregate": {
    "query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
    "urls_found": 6,
    "urls_after_dedupe": 4,
    "pages_read": 2,
    "notes_count": 2,
    "citations_count": 2,
    "denied_sources": [
      {
        "url": "https://regulator.example.org/guidance/customer-communications",
        "reason": "source_denied_execution"
      },
      {
        "url": "https://community-rumors.example.net/thread/payment-outage",
        "reason": "source_denied_policy"
      }
    ],
    "verified_notes": 2
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "search",
      "query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
      "candidates": 6,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "dedupe",
      "urls_after_dedupe": 4,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 4,
      "phase": "read_extract",
      "pages_read": 2,
      "notes": 2,
      "denied_sources": 2,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 5,
      "phase": "verify",
      "checked_notes": 2,
      "issues": 0,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 6,
      "phase": "synthesize",
      "answer_chars": 275,
      "citations": 2,
      "elapsed_ms": 1,
      "ok": true
    }
  ],
  "history": [{...}]
}

stop_reason Typiques

  • success — run termine correctement
  • max_seconds — budget total de temps epuise
  • invalid_plan:* — plan de recherche invalide
  • invalid_step:* — contrat de step invalide
  • invalid_search:query — query de recherche vide
  • no_sources_after_dedupe — aucune URL ne reste apres deduplication
  • no_reliable_sources — aucune note valide obtenue apres policy/read
  • tool_invalid_output:* — tool a renvoye un contrat invalide
  • verification_failed:* — etape verify a detecte un probleme critique dans les notes
  • invalid_notes:* — les notes n ont pas passe la validation gateway
  • invalid_answer:* — synthesis n a pas passe le contrat (reponse vide, trop longue, citations invalides)
  • source_denied_policy — raison typique de rejet de source dans denied_sources (pas un stop_reason separe)
  • source_denied_execution — raison typique de rejet de source dans denied_sources (pas un stop_reason separe)

Ce Qui N est PAS Montre Ici

  • vrai web-fetch/client HTTP (ici, deterministic mocked tools sont utilises)
  • cache TTL, retry policy et backoff pour sources instables
  • trust scoring et weighted source ranking
  • cross-check au niveau claim entre plusieurs sources independantes
  • review human-in-the-loop pour sujets high-risk

Ce Qu il Faut Essayer Ensuite

  1. Retirer vendor.example.com de execution allowlist et voir comment denied_sources et le resultat de synthesis changent.
  2. Reduire max_read_pages a 1 et verifier l impact sur citations_count.
  3. Remplacer citations dans synthesis par un id inexistant et voir invalid_answer:citation_unknown.
  4. Ajouter une note "cassée" avec quote courte et voir invalid_notes:quote ou verification_failed:*.
⏱️ 15 min de lectureMis à jour 6 mars 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.

Auteur

Nick — ingénieur qui construit une infrastructure pour des agents IA en production.

Focus : patterns d’agents, modes de défaillance, contrôle du runtime et fiabilité des systèmes.

🔗 GitHub: https://github.com/mykolademyanov


Note éditoriale

Cette documentation est assistée par l’IA, avec une responsabilité éditoriale humaine pour l’exactitude, la clarté et la pertinence en production.

Le contenu s’appuie sur des défaillances réelles, des post-mortems et des incidents opérationnels dans des systèmes d’agents IA déployés.