RAG Agent — Python (implementación completa con LLM)

Ejemplo ejecutable de agente RAG en Python con estilo de producción, con retrieval intent, policy boundary, grounded answer, citation allowlist check y fallback sin alucinaciones.
En esta página
  1. Esencia del patrón (breve)
  2. Qué demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Cómo ejecutar
  6. Tarea
  7. Solución
  8. Código
  9. kb.py — base de conocimiento local
  10. retriever.py — búsqueda determinista y empaquetado de contexto
  11. gateway.py — policy boundary para retrieval
  12. llm.py — retrieval planning + grounded answer
  13. main.py — Plan -> Retrieve -> Ground -> Answer
  14. requirements.txt
  15. Ejemplo de salida
  16. Valores típicos de stop_reason
  17. Qué NO se muestra aquí
  18. Qué probar después

Esencia del patrón (breve)

RAG Agent es un patrón en el que el agente primero encuentra fragmentos relevantes de conocimiento y solo después construye la respuesta a partir de ellos.

El LLM decide qué buscar (retrieval intent), y la policy/execution layer controla cómo buscar de forma segura (allowlist de fuentes, límites de contexto, fallback).


Qué demuestra este ejemplo

  • paso de retrieval planning (kind="retrieve") antes de generar la respuesta
  • policy boundary para validar retrieval intent (query, top_k, sources)
  • execution boundary para allowlist de fuentes en runtime
  • deterministic retriever + context packing (min_score, max_chunks, max_chars)
  • fallback sin inventar cuando el grounded context está vacío
  • generación de respuesta solo desde contexto + validación de citas
  • stop_reason, trace e history explícitos para monitoreo en producción

Arquitectura

  1. El LLM devuelve retrieval intent en JSON.
  2. Policy boundary valida el shape del intent y las fuentes permitidas.
  3. RetrievalGateway ejecuta búsqueda solo en fuentes permitidas en runtime.
  4. Context pack filtra fragmentos débiles (min_chunk_score) y mantiene límites de tamaño.
  5. Si no hay contexto relevante, se devuelve clarify/fallback (sin alucinaciones).
  6. Si hay contexto, el LLM genera grounded answer y el sistema valida citations.

Contrato clave: el LLM propone el intent y el texto de respuesta, pero la capa policy/execution define qué se puede usar y qué se considera resultado válido.

Policy allowlist define qué puede pedir el modelo, y execution allowlist define qué permite ejecutar realmente el runtime ahora.

stop_reason es el estado técnico del run, mientras que outcome es el resultado de negocio (grounded_answer o clarify).


Estructura del proyecto

TEXT
examples/
└── agent-patterns/
    └── rag-agent/
        └── python/
            ├── main.py           # Plan -> Retrieve -> Ground -> Answer
            ├── llm.py            # retrieval planner + grounded answer composer
            ├── gateway.py        # policy boundary: intent validation + source allowlist
            ├── retriever.py      # deterministic ranking + context pack
            ├── kb.py             # local knowledge base (documents + metadata)
            └── requirements.txt

Cómo ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/rag-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opción con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opción con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv para cargar .env automáticamente.


Tarea

Imagina un caso real de soporte:

"What SLA applies to enterprise plan and what is P1 first response target?"

El agente no debe responder "de memoria". Debe:

  • encontrar documentos de policy relevantes
  • usar solo fuentes permitidas
  • construir una grounded answer con citations
  • devolver fallback cuando faltan hechos, en vez de inventar

Solución

En este ejemplo:

  • el LLM planifica retrieval (query, top_k, sources opcional)
  • gateway valida intent y enforce la execution allowlist
  • retriever calcula relevancia y empaqueta contexto dentro de presupuestos
  • el paso generate ocurre solo si hay contexto suficiente
  • la respuesta final pasa citation allowlist check (todas las citas deben apuntar a chunks realmente seleccionados)

Código

kb.py — base de conocimiento local

PYTHON
from __future__ import annotations

from typing import Any

KB_DOCUMENTS: list[dict[str, Any]] = [
    {
        "id": "doc_sla_enterprise_v3",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Enterprise SLA",
        "updated_at": "2026-01-15",
        "text": (
            "Enterprise plan includes 99.95% monthly uptime SLA. "
            "For P1 incidents, first response target is 15 minutes, 24/7. "
            "For P2 incidents, first response target is 1 hour."
        ),
    },
    {
        "id": "doc_sla_standard_v2",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Standard SLA",
        "updated_at": "2025-11-10",
        "text": (
            "Standard plan includes 99.5% monthly uptime SLA. "
            "For P1 incidents, first response target is 1 hour during business hours."
        ),
    },
    {
        "id": "doc_security_incident_v2",
        "source": "security_policy",
        "title": "Security Incident Playbook",
        "section": "Escalation",
        "updated_at": "2026-01-20",
        "text": (
            "For enterprise customers, security-related P1 incidents require immediate escalation "
            "to the on-call incident commander and customer success lead."
        ),
    },
    {
        "id": "doc_refund_policy_v4",
        "source": "billing_policy",
        "title": "Billing and Refund Policy",
        "section": "Refund Eligibility",
        "updated_at": "2025-12-01",
        "text": (
            "Annual enterprise subscriptions may receive a prorated refund within 14 days "
            "under approved exception flow."
        ),
    },
    {
        "id": "doc_onboarding_checklist_v1",
        "source": "operations_notes",
        "title": "Enterprise Onboarding Checklist",
        "section": "Launch Prep",
        "updated_at": "2025-09-02",
        "text": (
            "Checklist for onboarding includes SSO setup, domain verification, and success plan kickoff."
        ),
    },
]

Lo más importante aquí (en simple)

  • El conocimiento se representa como documentos estructurados con metadata (id, source, updated_at).
  • Hay documentos relevantes e irrelevantes para mostrar el funcionamiento real del retriever.

retriever.py — búsqueda determinista y empaquetado de contexto

PYTHON
from __future__ import annotations

import re
from typing import Any

STOPWORDS = {
    "the",
    "and",
    "for",
    "with",
    "that",
    "this",
    "from",
    "into",
    "what",
    "which",
    "when",
    "where",
    "have",
    "has",
    "plan",
    "does",
}



def _tokenize(text: str) -> list[str]:
    tokens = re.findall(r"[a-zA-Z0-9_]+", text.lower())
    return [token for token in tokens if len(token) > 2 and token not in STOPWORDS]



def _score_document(query_tokens: list[str], doc_text: str) -> float:
    if not query_tokens:
        return 0.0

    haystack = doc_text.lower()
    overlap = sum(1 for token in query_tokens if token in haystack)
    base = overlap / len(query_tokens)

    # Boost explicit SLA intent to prefer policy-grade docs.
    phrase_boost = 0.0
    if "sla" in haystack:
        phrase_boost += 0.15
    if "p1" in haystack and "response" in haystack:
        phrase_boost += 0.1

    return round(min(base + phrase_boost, 1.0), 4)



def retrieve_candidates(
    *,
    query: str,
    documents: list[dict[str, Any]],
    top_k: int,
    allowed_sources: set[str],
) -> list[dict[str, Any]]:
    query_tokens = _tokenize(query)
    scored: list[dict[str, Any]] = []

    for doc in documents:
        if doc.get("source") not in allowed_sources:
            continue

        text = str(doc.get("text", ""))
        score = _score_document(query_tokens, text)
        if score <= 0:
            continue

        scored.append(
            {
                "doc_id": doc["id"],
                "source": doc["source"],
                "title": doc["title"],
                "section": doc["section"],
                "updated_at": doc["updated_at"],
                "score": score,
                "text": text,
            }
        )

    scored.sort(key=lambda item: item["score"], reverse=True)
    return scored[:top_k]



def build_context_pack(
    *,
    candidates: list[dict[str, Any]],
    min_score: float,
    max_chunks: int,
    max_chars: int,
) -> dict[str, Any]:
    selected: list[dict[str, Any]] = []
    total_chars = 0
    rejected_low_score = 0

    for item in candidates:
        if item["score"] < min_score:
            rejected_low_score += 1
            continue

        text = item["text"].strip()
        next_size = len(text)
        if len(selected) >= max_chunks:
            break
        if total_chars + next_size > max_chars:
            continue

        selected.append(item)
        total_chars += next_size

    return {
        "chunks": selected,
        "total_chars": total_chars,
        "rejected_low_score": rejected_low_score,
    }

Lo más importante aquí (en simple)

  • La búsqueda es determinista y predecible (fácil de testear).
  • Context pack recorta ruido y mantiene límites técnicos para una generación estable.

gateway.py — policy boundary para retrieval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from retriever import build_context_pack, retrieve_candidates


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_query_chars: int = 240
    max_top_k: int = 6
    max_context_chunks: int = 3
    max_context_chars: int = 2200
    min_chunk_score: float = 0.2
    max_seconds: int = 20



def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_sources_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_intent:not_object")

    if raw.get("kind") != "retrieve":
        raise StopRun("invalid_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_intent:top_k")

    sources_raw = raw.get("sources")
    normalized_sources: list[str] = []
    if sources_raw is not None:
        if not isinstance(sources_raw, list) or not sources_raw:
            raise StopRun("invalid_intent:sources")
        for source in sources_raw:
            if not isinstance(source, str) or not source.strip():
                raise StopRun("invalid_intent:source_item")
            source_name = source.strip()
            if source_name not in allowed_sources_policy:
                raise StopRun(f"invalid_intent:source_not_allowed:{source_name}")
            normalized_sources.append(source_name)

    # Ignore unknown keys and keep only contract fields.
    payload = {
        "kind": "retrieve",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_sources:
        payload["sources"] = normalized_sources
    return payload


class RetrievalGateway:
    def __init__(
        self,
        *,
        documents: list[dict[str, Any]],
        budget: Budget,
        allow_execution_sources: set[str],
    ):
        self.documents = documents
        self.budget = budget
        self.allow_execution_sources = set(allow_execution_sources)

    def run(self, intent: dict[str, Any]) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_intent:query_too_long")

        requested_sources = set(intent.get("sources") or self.allow_execution_sources)
        denied = sorted(requested_sources - self.allow_execution_sources)
        if denied:
            raise StopRun(f"source_denied:{denied[0]}")

        candidates = retrieve_candidates(
            query=query,
            documents=self.documents,
            top_k=intent["top_k"],
            allowed_sources=requested_sources,
        )

        context_pack = build_context_pack(
            candidates=candidates,
            min_score=self.budget.min_chunk_score,
            max_chunks=self.budget.max_context_chunks,
            max_chars=self.budget.max_context_chars,
        )

        return {
            "query": query,
            "requested_sources": sorted(requested_sources),
            "candidates": candidates,
            "context_chunks": context_pack["chunks"],
            "context_total_chars": context_pack["total_chars"],
            "rejected_low_score": context_pack["rejected_low_score"],
        }

Lo más importante aquí (en simple)

  • Gateway valida el contrato del intent y bloquea fuentes no permitidas.
  • Unknown keys se ignoran si los campos required son válidos.
  • Gateway solo enforce la execution allowlist que llega desde main.py.

llm.py — retrieval planning + grounded answer

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


RETRIEVAL_SYSTEM_PROMPT = """
You are a retrieval planner for a RAG system.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve",
  "query": "short retrieval query",
  "top_k": 4
}

Optional key:
- "sources": ["support_policy", "security_policy"]

Rules:
- Use only sources from available_sources.
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Prefer omitting "sources" unless the question explicitly requires a specific policy domain.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are a support assistant.
Return exactly one JSON object with this shape:
{
  "answer": "grounded answer in English",
  "citations": ["doc_id_1", "doc_id_2"]
}

Rules:
- Use only facts from provided context_chunks.
- Keep the answer concise and actionable.
- Include at least one citation.
- All citations must be doc_ids from context_chunks.
- Do not output markdown or extra keys.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def plan_retrieval_intent(*, question: str, available_sources: list[str]) -> dict[str, Any]:
    payload = {
        "question": question,
        "available_sources": available_sources,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}



def compose_grounded_answer(
    *,
    question: str,
    context_chunks: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "question": question,
        "context_chunks": [
            {
                "doc_id": item.get("doc_id"),
                "title": item.get("title"),
                "section": item.get("section"),
                "updated_at": item.get("updated_at"),
                "text": item.get("text"),
            }
            for item in context_chunks
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    citations = data.get("citations")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(citations, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_citations: list[str] = []
    for item in citations:
        if not isinstance(item, str):
            raise LLMInvalid("llm_invalid_schema")
        value = item.strip()
        if value:
            normalized_citations.append(value)

    return {
        "answer": answer.strip(),
        "citations": normalized_citations,
    }

Lo más importante aquí (en simple)

  • El LLM planifica retrieval por separado y compone grounded answer por separado.
  • Ambos pasos trabajan con contrato JSON, no con texto libre.

main.py — Plan -> Retrieve -> Ground -> Answer

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, RetrievalGateway, StopRun, validate_retrieval_intent
from kb import KB_DOCUMENTS
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_grounded_answer, plan_retrieval_intent

QUESTION = "What SLA applies to enterprise plan and what is P1 first response target?"

BUDGET = Budget(
    max_query_chars=240,
    max_top_k=6,
    max_context_chunks=3,
    max_context_chars=2200,
    min_chunk_score=0.2,
    max_seconds=20,
)

ALLOWED_SOURCES_POLICY = {
    "support_policy",
    "security_policy",
    "billing_policy",
}

SECURITY_SOURCE_RUNTIME_ENABLED = True
ALLOWED_SOURCES_EXECUTION = (
    {"support_policy", "security_policy", "billing_policy"}
    if SECURITY_SOURCE_RUNTIME_ENABLED
    else {"support_policy", "billing_policy"}
)
# Set SECURITY_SOURCE_RUNTIME_ENABLED=False to observe source_denied:security_policy.



def _shorten(text: str, *, limit: int = 280) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _validate_citations_from_context(
    context_chunks: list[dict[str, Any]],
    citations: list[str],
) -> tuple[list[str], list[dict[str, Any]], list[str], list[str]]:
    by_id: dict[str, dict[str, Any]] = {
        str(chunk["doc_id"]): chunk
        for chunk in context_chunks
        if chunk.get("doc_id")
    }

    normalized: list[str] = []
    seen: set[str] = set()
    for citation in citations:
        value = str(citation).strip()
        if not value or value in seen:
            continue
        seen.add(value)
        normalized.append(value)

    invalid = sorted([doc_id for doc_id in normalized if doc_id not in by_id])

    valid_doc_ids: list[str] = []
    citation_details: list[dict[str, Any]] = []
    for doc_id in normalized:
        chunk = by_id.get(doc_id)
        if not chunk:
            continue
        valid_doc_ids.append(doc_id)
        citation_details.append(
            {
                "doc_id": chunk["doc_id"],
                "title": chunk["title"],
                "section": chunk["section"],
                "updated_at": chunk["updated_at"],
                "source": chunk["source"],
                "score": chunk["score"],
            }
        )

    return valid_doc_ids, citation_details, invalid, sorted(by_id.keys())



def run_rag(question: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RetrievalGateway(
        documents=KB_DOCUMENTS,
        budget=BUDGET,
        allow_execution_sources=ALLOWED_SOURCES_EXECUTION,
    )

    try:
        raw_intent = plan_retrieval_intent(
            question=question,
            available_sources=sorted(ALLOWED_SOURCES_POLICY),
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_sources_policy=ALLOWED_SOURCES_POLICY,
            max_top_k=BUDGET.max_top_k,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_intent": raw_intent,
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "retrieve",
            "trace": trace,
            "history": history,
        }

    try:
        retrieval = gateway.run(intent)
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "retrieve",
            "intent": intent,
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 1,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_sources": retrieval["requested_sources"],
            "candidates": len(retrieval["candidates"]),
            "context_chunks": len(retrieval["context_chunks"]),
            "rejected_low_score": retrieval["rejected_low_score"],
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "intent": intent,
            "retrieval": {
                "candidates": [
                    {
                        "doc_id": item["doc_id"],
                        "source": item["source"],
                        "score": item["score"],
                    }
                    for item in retrieval["candidates"]
                ],
                "context_chunks": [item["doc_id"] for item in retrieval["context_chunks"]],
            },
        }
    )

    if not retrieval["context_chunks"]:
        fallback_answer = (
            "I could not find enough grounded evidence in approved sources. "
            "Please clarify the plan (enterprise/standard) or provide a policy document link."
        )
        trace.append(
            {
                "step": 2,
                "phase": "fallback",
                "reason": "no_grounded_context",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "fallback",
                "answer": fallback_answer,
            }
        )
        return {
            "status": "ok",
            "stop_reason": "success",
            "outcome": "clarify",
            "answer": fallback_answer,
            "citations": [],
            "citation_details": [],
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    try:
        final = compose_grounded_answer(
            question=question,
            context_chunks=retrieval["context_chunks"],
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMInvalid as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.args[0],
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    citations, citation_details, invalid_citations, context_doc_ids = _validate_citations_from_context(
        retrieval["context_chunks"],
        final["citations"],
    )
    if invalid_citations:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:citations_out_of_context",
            "phase": "generate",
            "invalid_citations": invalid_citations,
            "context_doc_ids": context_doc_ids,
            "trace": trace,
            "history": history,
        }
    if len(citations) < 1:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:missing_citations",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 2,
            "phase": "generate",
            "citation_count": len(citations),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "action": "compose_grounded_answer",
            "answer": _shorten(final["answer"]),
            "citations": citations,
        }
    )

    return {
        "status": "ok",
        "stop_reason": "success",
        "outcome": "grounded_answer",
        "answer": final["answer"],
        "citations": citations,
        "citation_details": citation_details,
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_rag(QUESTION)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo más importante aquí (en simple)

  • ALLOWED_SOURCES_POLICY y ALLOWED_SOURCES_EXECUTION se definen en main.py.
  • Gateway enforce la execution allowlist y no conoce el contexto de policy a nivel de negocio.
  • _validate_citations_from_context(...) devuelve 4 valores: doc_id válidos, citation_details, invalid_citations, context_doc_ids (para depurar stops de policy).
  • Si no hay base de evidencia, se devuelve outcome="clarify", no una respuesta inventada.

requirements.txt

TEXT
openai==2.21.0

Ejemplo de salida

Abajo hay un ejemplo de un grounded run exitoso donde el agente responde solo con documentos recuperados.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_answer",
  "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
  "citations": ["doc_sla_enterprise_v3"],
  "citation_details": [
    {
      "doc_id": "doc_sla_enterprise_v3",
      "title": "Support Policy",
      "section": "Enterprise SLA",
      "updated_at": "2026-01-15",
      "source": "support_policy",
      "score": 1.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "retrieve",
      "query": "SLA for enterprise plan and P1 first response target",
      "requested_sources": ["support_policy"],
      "candidates": 2,
      "context_chunks": 2,
      "rejected_low_score": 0,
      "ok": true
    },
    {
      "step": 2,
      "phase": "generate",
      "citation_count": 1,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "intent": {"kind": "retrieve", "query": "SLA for enterprise plan and P1 first response target", "top_k": 4, "sources": ["support_policy"]},
      "retrieval": {
        "candidates": [
          {"doc_id": "doc_sla_enterprise_v3", "source": "support_policy", "score": 1.0},
          {"doc_id": "doc_sla_standard_v2", "source": "support_policy", "score": 1.0}
        ],
        "context_chunks": ["doc_sla_enterprise_v3", "doc_sla_standard_v2"]
      }
    },
    {
      "step": 2,
      "action": "compose_grounded_answer",
      "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
      "citations": ["doc_sla_enterprise_v3"]
    }
  ]
}

Este es un ejemplo abreviado: parte de los campos anidados se compactan en una sola línea sin cambiar el significado.


Valores típicos de stop_reason

  • success — el run terminó correctamente; ver outcome (grounded_answer o clarify)
  • invalid_intent:* — el retrieval intent del LLM no pasó la validación de policy
  • source_denied:<name> — la fuente no está permitida por la execution allowlist
  • llm_timeout — el LLM no respondió dentro de OPENAI_TIMEOUT_SECONDS
  • llm_empty — el paso generate devolvió answer vacío
  • llm_invalid_json — el paso generate devolvió JSON inválido
  • llm_invalid_schema — el JSON no coincide con el schema esperado (answer/citations)
  • invalid_answer:missing_citations — la respuesta no está respaldada por ninguna cita válida
  • invalid_answer:citations_out_of_context — la respuesta incluye citas que no están entre los retrieval context chunks
  • max_seconds — se excedió el presupuesto total de tiempo del run

Qué NO se muestra aquí

  • No hay índice vectorial/embeddings ni búsqueda híbrida.
  • No hay auth/ACL multi-tenant a nivel de documento.
  • No hay modelo reranker ni deduplicación semántica.
  • No hay actualización online del índice cuando cambia la knowledge base.

Qué probar después

  1. Pon SECURITY_SOURCE_RUNTIME_ENABLED=False y solicita security_policy para ver source_denied:*.
  2. Aumenta min_chunk_score para observar más outcome="clarify" sin alucinaciones.
  3. Añade un post-check que compare cifras clave de la respuesta con el texto de los documentos citados.
⏱️ 16 min de lecturaActualizado Mar, 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.
Autor

Esta documentación está curada y mantenida por ingenieros que despliegan agentes de IA en producción.

El contenido es asistido por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

Los patrones y las recomendaciones se basan en post-mortems, modos de fallo e incidentes operativos en sistemas desplegados, incluido durante el desarrollo y la operación de infraestructura de gobernanza para agentes en OnceOnly.