Esencia del Patron (breve)
Research Agent significa: el agente no responde "de memoria", sino que sigue un workflow controlado:
search_sourcesdedupe_urlspolicy-check(runtime gate antes deread_source, no como step de plan separado)read_extract_notesverify_notessynthesize_answer
Es decir, la respuesta final se forma solo con notas verificadas, y las citas se permiten solo con citation id validos que apuntan a una nota con provenance (url, quote, published_at).
Que Demuestra Este Ejemplo
- El agente propone un plan de investigacion, pero runtime valida contrato del plan y orden de pasos
- Las URL se normalizan y deduplican antes de leer fuentes
- Policy allowlist y execution allowlist para dominios estan separadas
read_extract_notestrabaja con budgets (max_urls,max_read_pages,max_notes)- Las notas incluyen provenance (
url,title,published_at,quote) verify_notesen esta demo es un quality-gate estructural minimo (no fact-check cross-source)- La synthesis se permite solo con citation-id validos
trace/historydan auditabilidad desde search hasta la grounded answer final
Arquitectura
agent.pyforma el plan (search_sources → dedupe_urls → read_extract_notes → verify_notes → synthesize_answer).gateway.pyvalida plan, decisiones de policy para fuentes y contratos de notas/respuesta.tools.pyimplementa pasos deterministicsearch_sources/read_source/extract_notes_from_page/verify_notes.main.pyorquesta el workflow, controla budgets y devuelvetrace/history.
Estructura del Proyecto
agent-patterns/
└── research-agent/
└── python/
├── main.py
├── gateway.py
├── tools.py
├── agent.py
├── context.py
├── README.md
└── requirements.txt
Como Ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/research-agent/python
python main.py
En este ejemplo no se requieren dependencias externas.
Tarea
Caso de produccion:
"Prepara un estado corto sobre un incidente de pagos en US y confirma enterprise SLA con citas explicitas."
Codigo
context.py — request envelope + policy hints
from __future__ import annotations
from typing import Any
def build_request(*, report_date: str, region: str) -> dict[str, Any]:
return {
"request": {
"report_date": report_date,
"region": region.upper(),
"question": (
"What is the current US payments incident status and what enterprise SLA "
"commitments apply for uptime and P1 response time?"
),
},
"policy_hints": {
"allowed_domains_policy": [
"official-status.example.com",
"vendor.example.com",
"regulator.example.org",
],
"allowed_domains_execution": [
"official-status.example.com",
"vendor.example.com",
],
"max_urls": 6,
"max_read_pages": 3,
"max_notes": 6,
"max_answer_chars": 850,
},
}
tools.py — herramientas search/read/extract/verify
from __future__ import annotations
from typing import Any
from urllib.parse import urlparse
RAW_SEARCH_RESULTS = [
{
"url": "https://official-status.example.com/incidents/payments-2026-03-07",
"title": "Payments Incident Update",
"snippet": "P1 incident, failed payment rate and ETA updates.",
"score": 0.98,
},
{
"url": "https://vendor.example.com/policies/enterprise-sla",
"title": "Enterprise SLA",
"snippet": "Uptime SLA and response targets by severity.",
"score": 0.94,
},
{
"url": "https://regulator.example.org/guidance/customer-communications",
"title": "Customer Communication Guidance",
"snippet": "Expectations for incident disclosures.",
"score": 0.81,
},
{
"url": "https://official-status.example.com/incidents/payments-2026-03-07#latest",
"title": "Payments Incident Update (duplicate URL form)",
"snippet": "Duplicate page with fragment.",
"score": 0.73,
},
{
"url": "https://vendor.example.com/policies/enterprise-sla?ref=search",
"title": "Enterprise SLA (duplicate URL form)",
"snippet": "Duplicate page with query string.",
"score": 0.71,
},
{
"url": "https://community-rumors.example.net/thread/payment-outage",
"title": "Community Thread",
"snippet": "Unverified forum claims.",
"score": 0.42,
},
]
PAGES: dict[str, dict[str, Any]] = {
"https://official-status.example.com/incidents/payments-2026-03-07": {
"title": "Payments Incident Update",
"published_at": "2026-03-07",
"body": (
"US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. "
"Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change."
),
},
"https://vendor.example.com/policies/enterprise-sla": {
"title": "Enterprise SLA",
"published_at": "2026-01-15",
"body": (
"Enterprise monthly uptime SLA is 99.95%. "
"For P1 incidents, first response target is 15 minutes, available 24/7."
),
},
"https://regulator.example.org/guidance/customer-communications": {
"title": "Customer Communication Guidance",
"published_at": "2025-11-04",
"body": (
"Service providers should publish regular incident updates with known impact and recovery status."
),
},
}
def normalize_url(url: str) -> str:
parsed = urlparse(str(url).strip())
scheme = (parsed.scheme or "https").lower()
host = parsed.netloc.lower()
path = parsed.path or "/"
if path != "/" and path.endswith("/"):
path = path[:-1]
return f"{scheme}://{host}{path}"
def search_sources(*, query: str, k: int) -> dict[str, Any]:
del query
return {
"status": "ok",
"data": {
"results": [dict(item) for item in RAW_SEARCH_RESULTS[: max(1, int(k))]],
},
}
def read_source(*, url: str) -> dict[str, Any]:
normalized = normalize_url(url)
page = PAGES.get(normalized)
if page is None:
return {
"status": "error",
"error": "not_found",
}
return {
"status": "ok",
"data": {
"url": normalized,
"title": str(page["title"]),
"published_at": str(page["published_at"]),
"body": str(page["body"]),
},
}
def extract_notes_from_page(*, url: str, page: dict[str, Any]) -> list[dict[str, Any]]:
normalized = normalize_url(url)
if normalized == "https://official-status.example.com/incidents/payments-2026-03-07":
return [
{
"claim": "US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes.",
"quote": "US payment gateway is in P1 degraded mode. Failed payment rate is 3.4%. Chargeback alerts observed: 5. Estimated time to recovery: 45 minutes, subject to change.",
"url": normalized,
"title": page["title"],
"published_at": page["published_at"],
}
]
if normalized == "https://vendor.example.com/policies/enterprise-sla":
return [
{
"claim": "Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7).",
"quote": "Enterprise monthly uptime SLA is 99.95%. For P1 incidents, first response target is 15 minutes, available 24/7.",
"url": normalized,
"title": page["title"],
"published_at": page["published_at"],
}
]
return []
def verify_notes(*, notes: list[dict[str, Any]]) -> dict[str, Any]:
checked = 0
issues: list[str] = []
for note in notes:
checked += 1
quote = str(note.get("quote", "")).strip()
claim = str(note.get("claim", "")).strip()
if len(quote) < 20:
issues.append("quote_too_short")
if not claim:
issues.append("claim_missing")
return {
"status": "ok",
"data": {
"ok": len(issues) == 0,
"checked_notes": checked,
"issues": issues,
},
}
agent.py — plan + synthesis desde notas
from __future__ import annotations
from typing import Any
def propose_research_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
del goal
query = request["request"]["question"]
return {
"steps": [
{
"id": "r1",
"action": "search_sources",
"args": {
"query": query,
},
},
{"id": "r2", "action": "dedupe_urls", "args": {}},
{"id": "r3", "action": "read_extract_notes", "args": {}},
{"id": "r4", "action": "verify_notes", "args": {}},
{"id": "r5", "action": "synthesize_answer", "args": {}},
]
}
def synthesize_from_notes(*, goal: str, notes: list[dict[str, Any]]) -> dict[str, Any]:
del goal
if not notes:
return {
"answer": "",
"citations": [],
}
selected = notes[:3]
citations = [str(item["id"]) for item in selected]
claims = [str(item["claim"]).strip() for item in selected]
answer = (
"Research brief: "
+ " ".join(claims)
+ " Timeline values are estimates and may change."
)
return {
"answer": answer,
"citations": citations,
}
gateway.py — policy/validation boundaries
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse
class StopRun(Exception):
def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
super().__init__(reason)
self.reason = reason
self.details = details or {}
@dataclass(frozen=True)
class Budget:
max_seconds: int = 25
max_steps: int = 8
max_urls: int = 6
max_read_pages: int = 3
max_notes: int = 6
max_answer_chars: int = 850
@dataclass(frozen=True)
class Decision:
kind: str
reason: str
EXPECTED_ACTION_SEQUENCE = [
"search_sources",
"dedupe_urls",
"read_extract_notes",
"verify_notes",
"synthesize_answer",
]
def normalize_url(url: str) -> str:
parsed = urlparse(str(url).strip())
scheme = (parsed.scheme or "https").lower()
host = parsed.netloc.lower()
path = parsed.path or "/"
if path != "/" and path.endswith("/"):
path = path[:-1]
return f"{scheme}://{host}{path}"
def get_domain(url: str) -> str:
return urlparse(str(url).strip()).netloc.lower()
def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
if not isinstance(raw_steps, list) or not raw_steps:
raise StopRun("invalid_plan:steps")
if len(raw_steps) > max_steps:
raise StopRun("invalid_plan:too_many_steps")
out: list[dict[str, Any]] = []
actions: list[str] = []
for raw in raw_steps:
if not isinstance(raw, dict):
raise StopRun("invalid_step:not_object")
step_id = raw.get("id")
action = raw.get("action")
args = raw.get("args")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun("invalid_step:id")
if not isinstance(action, str) or not action.strip():
raise StopRun("invalid_step:action")
if not isinstance(args, dict):
raise StopRun("invalid_step:args")
normalized = {
"id": step_id.strip(),
"action": action.strip(),
"args": dict(args),
}
out.append(normalized)
actions.append(normalized["action"])
if actions != EXPECTED_ACTION_SEQUENCE:
raise StopRun(
"invalid_plan:step_sequence",
details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
)
return out
def dedupe_urls(*, raw_urls: list[str], max_urls: int) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for raw in raw_urls:
normalized = normalize_url(raw)
if normalized in seen:
continue
seen.add(normalized)
out.append(normalized)
if len(out) >= max_urls:
break
return out
class ResearchGateway:
def __init__(
self,
*,
allowed_domains_policy: set[str],
allowed_domains_execution: set[str],
budget: Budget,
):
self.allowed_domains_policy = {d.lower() for d in allowed_domains_policy}
self.allowed_domains_execution = {d.lower() for d in allowed_domains_execution}
self.budget = budget
def evaluate_source(self, *, url: str) -> Decision:
domain = get_domain(url)
if domain not in self.allowed_domains_policy:
return Decision(kind="deny", reason="source_denied_policy")
if domain not in self.allowed_domains_execution:
return Decision(kind="deny", reason="source_denied_execution")
return Decision(kind="allow", reason="policy_pass")
def validate_notes(self, *, notes: list[dict[str, Any]]) -> None:
if not isinstance(notes, list) or not notes:
raise StopRun("invalid_notes:empty")
if len(notes) > self.budget.max_notes:
raise StopRun("invalid_notes:too_many")
for note in notes:
if not isinstance(note, dict):
raise StopRun("invalid_notes:item")
if not isinstance(note.get("id"), str) or not note["id"].strip():
raise StopRun("invalid_notes:id")
if not isinstance(note.get("url"), str) or not note["url"].strip():
raise StopRun("invalid_notes:url")
if not isinstance(note.get("claim"), str) or not note["claim"].strip():
raise StopRun("invalid_notes:claim")
quote = note.get("quote")
if not isinstance(quote, str) or len(quote.strip()) < 20:
raise StopRun("invalid_notes:quote")
def validate_synthesis(self, *, answer: str, citations: list[str], notes: list[dict[str, Any]]) -> None:
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_answer:empty")
if len(answer) > self.budget.max_answer_chars:
raise StopRun("invalid_answer:too_long")
if not isinstance(citations, list) or not citations:
raise StopRun("invalid_answer:citations")
note_ids = {str(item["id"]) for item in notes}
for citation in citations:
if str(citation) not in note_ids:
raise StopRun("invalid_answer:citation_unknown")
main.py — orchestrate bounded research workflow
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from agent import propose_research_plan, synthesize_from_notes
from context import build_request
from gateway import Budget, ResearchGateway, StopRun, dedupe_urls, validate_plan
from tools import extract_notes_from_page, read_source, search_sources, verify_notes
GOAL = (
"Research current US payments incident status and enterprise SLA commitments, "
"then return a concise grounded summary with citations."
)
REQUEST = build_request(
report_date="2026-03-07",
region="US",
)
DEFAULT_BUDGET = Budget(
max_seconds=25,
max_steps=8,
max_urls=6,
max_read_pages=3,
max_notes=6,
max_answer_chars=850,
)
def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
raise StopRun(f"tool_invalid_output:{tool_name}")
return dict(raw["data"])
def _safe_int(value: Any, *, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return int(default)
def run_research_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
hints_raw = request.get("policy_hints")
hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}
allowed_domains_policy_raw = hints.get("allowed_domains_policy")
if isinstance(allowed_domains_policy_raw, list):
allowed_domains_policy = {
str(item).strip().lower()
for item in allowed_domains_policy_raw
if isinstance(item, str) and item.strip()
}
else:
allowed_domains_policy = {
"official-status.example.com",
"vendor.example.com",
"regulator.example.org",
}
allowed_domains_execution_raw = hints.get("allowed_domains_execution")
if isinstance(allowed_domains_execution_raw, list):
allowed_domains_execution = {
str(item).strip().lower()
for item in allowed_domains_execution_raw
if isinstance(item, str) and item.strip()
}
else:
allowed_domains_execution = {
"official-status.example.com",
"vendor.example.com",
}
max_urls = _safe_int(hints.get("max_urls", DEFAULT_BUDGET.max_urls), default=DEFAULT_BUDGET.max_urls)
max_read_pages = _safe_int(hints.get("max_read_pages", DEFAULT_BUDGET.max_read_pages), default=DEFAULT_BUDGET.max_read_pages)
max_notes = _safe_int(hints.get("max_notes", DEFAULT_BUDGET.max_notes), default=DEFAULT_BUDGET.max_notes)
max_answer_chars = _safe_int(hints.get("max_answer_chars", DEFAULT_BUDGET.max_answer_chars), default=DEFAULT_BUDGET.max_answer_chars)
budget = Budget(
max_seconds=DEFAULT_BUDGET.max_seconds,
max_steps=DEFAULT_BUDGET.max_steps,
max_urls=max(1, min(20, max_urls)),
max_read_pages=max(1, min(10, max_read_pages)),
max_notes=max(1, min(20, max_notes)),
max_answer_chars=max(120, min(2000, max_answer_chars)),
)
gateway = ResearchGateway(
allowed_domains_policy=allowed_domains_policy,
allowed_domains_execution=allowed_domains_execution,
budget=budget,
)
def elapsed_ms() -> int:
return max(1, int((time.monotonic() - started) * 1000))
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
phase = "plan"
try:
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase=phase)
raw_plan = propose_research_plan(goal=goal, request=request)
steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)
trace.append(
{
"step": 1,
"phase": "plan",
"steps": len(steps),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "propose_research_plan",
"step_ids": [step["id"] for step in steps],
}
)
phase = "search"
query = str(steps[0]["args"].get("query", "")).strip()
if not query:
return stopped("invalid_search:query", phase=phase)
search_data = _unwrap_tool_data(
search_sources(query=query, k=budget.max_urls * 2),
tool_name="search_sources",
)
search_results = list(search_data.get("results", []))
candidate_urls = [str(item.get("url", "")).strip() for item in search_results if isinstance(item, dict)]
trace.append(
{
"step": 2,
"phase": "search",
"query": query,
"candidates": len(candidate_urls),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 2,
"action": "search_sources",
"query": query,
"candidates": len(candidate_urls),
}
)
phase = "dedupe"
deduped_urls = dedupe_urls(raw_urls=candidate_urls, max_urls=budget.max_urls)
if not deduped_urls:
return stopped("no_sources_after_dedupe", phase=phase)
trace.append(
{
"step": 3,
"phase": "dedupe",
"urls_after_dedupe": len(deduped_urls),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "dedupe_urls",
"urls_after_dedupe": len(deduped_urls),
}
)
phase = "read_extract"
notes: list[dict[str, Any]] = []
read_urls: list[str] = []
denied_sources: list[dict[str, str]] = []
for url in deduped_urls:
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase=phase)
decision = gateway.evaluate_source(url=url)
if decision.kind != "allow":
denied_sources.append({"url": url, "reason": decision.reason})
continue
if len(read_urls) >= budget.max_read_pages:
break
page = _unwrap_tool_data(
read_source(url=url),
tool_name="read_source",
)
extracted = extract_notes_from_page(url=url, page=page)
for item in extracted:
note = dict(item)
note["id"] = f"n{len(notes) + 1}"
notes.append(note)
if len(notes) >= budget.max_notes:
break
read_urls.append(url)
if len(notes) >= budget.max_notes:
break
if not notes:
return stopped(
"no_reliable_sources",
phase=phase,
denied_sources=denied_sources,
)
gateway.validate_notes(notes=notes)
trace.append(
{
"step": 4,
"phase": "read_extract",
"pages_read": len(read_urls),
"notes": len(notes),
"denied_sources": len(denied_sources),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 4,
"action": "read_extract_notes",
"pages_read": len(read_urls),
"denied_sources": denied_sources,
}
)
phase = "verify"
verification = _unwrap_tool_data(
verify_notes(notes=notes),
tool_name="verify_notes",
)
if not bool(verification.get("ok")):
issues = verification.get("issues") or []
first = str(issues[0]) if issues else "unknown"
return stopped(f"verification_failed:{first}", phase=phase, verification=verification)
trace.append(
{
"step": 5,
"phase": "verify",
"checked_notes": int(verification.get("checked_notes", 0)),
"issues": len(verification.get("issues", [])),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 5,
"action": "verify_notes",
"checked_notes": int(verification.get("checked_notes", 0)),
}
)
phase = "synthesize"
synthesis = synthesize_from_notes(goal=goal, notes=notes)
answer = str(synthesis.get("answer", "")).strip()
citations = [str(item).strip() for item in synthesis.get("citations", []) if str(item).strip()]
gateway.validate_synthesis(answer=answer, citations=citations, notes=notes)
aggregate = {
"query": query,
"urls_found": len(candidate_urls),
"urls_after_dedupe": len(deduped_urls),
"pages_read": len(read_urls),
"notes_count": len(notes),
"citations_count": len(citations),
"denied_sources": denied_sources,
"verified_notes": int(verification.get("checked_notes", 0)),
}
trace.append(
{
"step": 6,
"phase": "synthesize",
"answer_chars": len(answer),
"citations": len(citations),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 6,
"action": "synthesize_answer",
"citations": citations,
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "grounded_research_answer",
"answer": answer,
"citations": citations,
"citation_details": [
{
"id": str(note["id"]),
"url": str(note["url"]),
"title": str(note["title"]),
"published_at": str(note["published_at"]),
}
for note in notes
if str(note["id"]) in set(citations)
],
"aggregate": aggregate,
"trace": trace,
"history": history,
}
except StopRun as exc:
return stopped(
exc.reason,
phase=phase,
**({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
)
def main() -> None:
result = run_research_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Lo mas importante aqui (en palabras simples)
search_sourcesno da derecho a escribir la respuesta de inmediato; primero vandedupe_urls,policy-check(runtime gate),read_extract_notes,verify_notes- Execution allowlist puede bloquear un dominio incluso si policy allowlist lo permite
no_reliable_sourcesse activa si despues de policy/read no queda ninguna note validaverify_notesen este ejemplo no confirma verdad de claims entre fuentes independientes; es un quality-gate estructural minimo para notes- Writer trabaja solo con
notes, y loscitation iddeben apuntar a una note existente; citation-id inventados se bloquean coninvalid_answer:citation_unknown pages_readynotes_countson senales distintas: una pagina puede leerse y aun asi no producir ninguna note despues deextract_notes_from_pagetrace/historymuestran cuantas URL se encontraron, cuantas rechazo policy y cuantas notes se verificaron realmente
Ejemplo de Salida
{
"run_id": "08edca9e-6972-493c-98d5-11b53d0d1fc2",
"status": "ok",
"stop_reason": "success",
"outcome": "grounded_research_answer",
"answer": "Research brief: US payments incident is P1 with failed payment rate 3.4%, 5 chargeback alerts, and ETA about 45 minutes. Enterprise SLA includes 99.95% monthly uptime and a 15-minute first response target for P1 incidents (24/7). Timeline values are estimates and may change.",
"citations": [
"n1",
"n2"
],
"citation_details": [
{
"id": "n1",
"url": "https://official-status.example.com/incidents/payments-2026-03-07",
"title": "Payments Incident Update",
"published_at": "2026-03-07"
},
{
"id": "n2",
"url": "https://vendor.example.com/policies/enterprise-sla",
"title": "Enterprise SLA",
"published_at": "2026-01-15"
}
],
"aggregate": {
"query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
"urls_found": 6,
"urls_after_dedupe": 4,
"pages_read": 2,
"notes_count": 2,
"citations_count": 2,
"denied_sources": [
{
"url": "https://regulator.example.org/guidance/customer-communications",
"reason": "source_denied_execution"
},
{
"url": "https://community-rumors.example.net/thread/payment-outage",
"reason": "source_denied_policy"
}
],
"verified_notes": 2
},
"trace": [
{
"step": 1,
"phase": "plan",
"steps": 5,
"elapsed_ms": 1,
"ok": true
},
{
"step": 2,
"phase": "search",
"query": "What is the current US payments incident status and what enterprise SLA commitments apply for uptime and P1 response time?",
"candidates": 6,
"elapsed_ms": 1,
"ok": true
},
{
"step": 3,
"phase": "dedupe",
"urls_after_dedupe": 4,
"elapsed_ms": 1,
"ok": true
},
{
"step": 4,
"phase": "read_extract",
"pages_read": 2,
"notes": 2,
"denied_sources": 2,
"elapsed_ms": 1,
"ok": true
},
{
"step": 5,
"phase": "verify",
"checked_notes": 2,
"issues": 0,
"elapsed_ms": 1,
"ok": true
},
{
"step": 6,
"phase": "synthesize",
"answer_chars": 275,
"citations": 2,
"elapsed_ms": 1,
"ok": true
}
],
"history": [{...}]
}
stop_reason Tipicos
success— run terminado correctamentemax_seconds— se agoto el time budget totalinvalid_plan:*— plan de investigacion invalidoinvalid_step:*— contrato de step invalidoinvalid_search:query— query de busqueda vaciono_sources_after_dedupe— no quedan URL tras la deduplicacionno_reliable_sources— no se obtuvo ninguna note valida despues de policy/readtool_invalid_output:*— el tool devolvio contrato invalidoverification_failed:*— el paso verify detecto un problema critico en notesinvalid_notes:*— notes no pasaron validacion del gatewayinvalid_answer:*— synthesis no paso contrato (respuesta vacia, longitud excesiva, citas invalidas)source_denied_policy— razon tipica de descarte de fuente endenied_sources(no es unstop_reasonseparado)source_denied_execution— razon tipica de descarte de fuente endenied_sources(no es unstop_reasonseparado)
Lo Que NO Se Muestra Aqui
- web-fetch/cliente HTTP real (aqui se usan mocked tools deterministic)
- cache TTL, retry policy y backoff para fuentes inestables
- trust scoring y weighted source ranking
- cross-check a nivel claim entre varias fuentes independientes
- human-in-the-loop review para temas de alto riesgo
Que Probar Despues
- Quita
vendor.example.comde execution allowlist y mira como cambiandenied_sourcesy el resultado de synthesis. - Baja
max_read_pagesa1y verifica el impacto encitations_count. - Sustituye
citationsen synthesis por un id inexistente y mirainvalid_answer:citation_unknown. - Agrega una note "rota" con quote corto y mira
invalid_notes:quoteoverification_failed:*.