RAG Agent — Python (implémentation complète avec LLM)

Exemple exécutable d’agent RAG en Python, style production, avec retrieval intent, policy boundary, grounded answer, citation allowlist check et fallback sans hallucinations.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce que cet exemple démontre
  3. Architecture
  4. Structure du projet
  5. Lancer le projet
  6. Tâche
  7. Solution
  8. Code
  9. kb.py — base de connaissance locale
  10. retriever.py — recherche déterministe et packing du contexte
  11. gateway.py — policy boundary pour le retrieval
  12. llm.py — retrieval planning + grounded answer
  13. main.py — Plan -> Retrieve -> Ground -> Answer
  14. requirements.txt
  15. Exemple de sortie
  16. Valeurs stop_reason typiques
  17. Ce qui n’est PAS montré ici
  18. Ce que vous pouvez essayer ensuite

Essence du pattern (bref)

RAG Agent est un pattern où l’agent trouve d’abord des fragments de connaissance pertinents, puis construit la réponse à partir de ceux-ci.

Le LLM décide quoi chercher (retrieval intent), et la policy/execution layer contrôle comment chercher de façon sûre (allowlist des sources, limites de contexte, fallback).


Ce que cet exemple démontre

  • étape de retrieval planning (kind="retrieve") avant la génération de réponse
  • policy boundary pour valider retrieval intent (query, top_k, sources)
  • execution boundary pour l’allowlist des sources au runtime
  • deterministic retriever + context packing (min_score, max_chunks, max_chars)
  • fallback sans invention quand le grounded context est vide
  • génération de réponse uniquement depuis le contexte + validation des citations
  • stop_reason, trace, history explicites pour le monitoring production

Architecture

  1. Le LLM renvoie un retrieval intent en JSON.
  2. Policy boundary valide le shape de l’intent et les sources autorisées.
  3. RetrievalGateway exécute la recherche uniquement dans les sources autorisées au runtime.
  4. Context pack filtre les fragments faibles (min_chunk_score) et respecte les limites de taille.
  5. S’il n’y a pas de contexte pertinent, un clarify/fallback est renvoyé (sans hallucinations).
  6. S’il y a du contexte, le LLM génère une grounded answer et le système valide les citations.

Contrat clé : le LLM propose l’intent et le texte de réponse, mais la couche policy/execution définit ce qui peut être utilisé et ce qui est considéré comme un résultat valide.

Policy allowlist définit ce que le modèle peut demander, et execution allowlist définit ce que le runtime peut réellement exécuter maintenant.

stop_reason est le statut technique du run, tandis que outcome est le résultat métier (grounded_answer ou clarify).


Structure du projet

TEXT
examples/
└── agent-patterns/
    └── rag-agent/
        └── python/
            ├── main.py           # Plan -> Retrieve -> Ground -> Answer
            ├── llm.py            # retrieval planner + grounded answer composer
            ├── gateway.py        # policy boundary: intent validation + source allowlist
            ├── retriever.py      # deterministic ranking + context pack
            ├── kb.py             # local knowledge base (documents + metadata)
            └── requirements.txt

Lancer le projet

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/rag-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Option via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C’est la variante shell (macOS/Linux). Sur Windows, il est plus simple d’utiliser des variables set ou, si souhaité, python-dotenv pour charger .env automatiquement.


Tâche

Imagine un cas réel de support :

"What SLA applies to enterprise plan and what is P1 first response target?"

L’agent ne doit pas répondre "de mémoire". Il doit :

  • trouver des documents de policy pertinents
  • utiliser uniquement des sources autorisées
  • composer une grounded answer avec des citations
  • renvoyer un fallback si les faits sont insuffisants, sans inventer

Solution

Dans cet exemple :

  • le LLM planifie le retrieval (query, top_k, sources optionnel)
  • gateway valide l’intent et enforce l’execution allowlist
  • retriever calcule la pertinence et packe le contexte dans les budgets
  • l’étape generate s’exécute seulement s’il y a suffisamment de contexte
  • la réponse finale passe un citation allowlist check (toutes les citations doivent référencer des chunks réellement sélectionnés)

Code

kb.py — base de connaissance locale

PYTHON
from __future__ import annotations

from typing import Any

KB_DOCUMENTS: list[dict[str, Any]] = [
    {
        "id": "doc_sla_enterprise_v3",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Enterprise SLA",
        "updated_at": "2026-01-15",
        "text": (
            "Enterprise plan includes 99.95% monthly uptime SLA. "
            "For P1 incidents, first response target is 15 minutes, 24/7. "
            "For P2 incidents, first response target is 1 hour."
        ),
    },
    {
        "id": "doc_sla_standard_v2",
        "source": "support_policy",
        "title": "Support Policy",
        "section": "Standard SLA",
        "updated_at": "2025-11-10",
        "text": (
            "Standard plan includes 99.5% monthly uptime SLA. "
            "For P1 incidents, first response target is 1 hour during business hours."
        ),
    },
    {
        "id": "doc_security_incident_v2",
        "source": "security_policy",
        "title": "Security Incident Playbook",
        "section": "Escalation",
        "updated_at": "2026-01-20",
        "text": (
            "For enterprise customers, security-related P1 incidents require immediate escalation "
            "to the on-call incident commander and customer success lead."
        ),
    },
    {
        "id": "doc_refund_policy_v4",
        "source": "billing_policy",
        "title": "Billing and Refund Policy",
        "section": "Refund Eligibility",
        "updated_at": "2025-12-01",
        "text": (
            "Annual enterprise subscriptions may receive a prorated refund within 14 days "
            "under approved exception flow."
        ),
    },
    {
        "id": "doc_onboarding_checklist_v1",
        "source": "operations_notes",
        "title": "Enterprise Onboarding Checklist",
        "section": "Launch Prep",
        "updated_at": "2025-09-02",
        "text": (
            "Checklist for onboarding includes SSO setup, domain verification, and success plan kickoff."
        ),
    },
]

Ce qui compte le plus ici (en clair)

  • La connaissance est représentée sous forme de documents structurés avec metadata (id, source, updated_at).
  • Il y a des documents pertinents et non pertinents pour montrer le comportement réel du retriever.

retriever.py — recherche déterministe et packing du contexte

PYTHON
from __future__ import annotations

import re
from typing import Any

STOPWORDS = {
    "the",
    "and",
    "for",
    "with",
    "that",
    "this",
    "from",
    "into",
    "what",
    "which",
    "when",
    "where",
    "have",
    "has",
    "plan",
    "does",
}



def _tokenize(text: str) -> list[str]:
    tokens = re.findall(r"[a-zA-Z0-9_]+", text.lower())
    return [token for token in tokens if len(token) > 2 and token not in STOPWORDS]



def _score_document(query_tokens: list[str], doc_text: str) -> float:
    if not query_tokens:
        return 0.0

    haystack = doc_text.lower()
    overlap = sum(1 for token in query_tokens if token in haystack)
    base = overlap / len(query_tokens)

    # Boost explicit SLA intent to prefer policy-grade docs.
    phrase_boost = 0.0
    if "sla" in haystack:
        phrase_boost += 0.15
    if "p1" in haystack and "response" in haystack:
        phrase_boost += 0.1

    return round(min(base + phrase_boost, 1.0), 4)



def retrieve_candidates(
    *,
    query: str,
    documents: list[dict[str, Any]],
    top_k: int,
    allowed_sources: set[str],
) -> list[dict[str, Any]]:
    query_tokens = _tokenize(query)
    scored: list[dict[str, Any]] = []

    for doc in documents:
        if doc.get("source") not in allowed_sources:
            continue

        text = str(doc.get("text", ""))
        score = _score_document(query_tokens, text)
        if score <= 0:
            continue

        scored.append(
            {
                "doc_id": doc["id"],
                "source": doc["source"],
                "title": doc["title"],
                "section": doc["section"],
                "updated_at": doc["updated_at"],
                "score": score,
                "text": text,
            }
        )

    scored.sort(key=lambda item: item["score"], reverse=True)
    return scored[:top_k]



def build_context_pack(
    *,
    candidates: list[dict[str, Any]],
    min_score: float,
    max_chunks: int,
    max_chars: int,
) -> dict[str, Any]:
    selected: list[dict[str, Any]] = []
    total_chars = 0
    rejected_low_score = 0

    for item in candidates:
        if item["score"] < min_score:
            rejected_low_score += 1
            continue

        text = item["text"].strip()
        next_size = len(text)
        if len(selected) >= max_chunks:
            break
        if total_chars + next_size > max_chars:
            continue

        selected.append(item)
        total_chars += next_size

    return {
        "chunks": selected,
        "total_chars": total_chars,
        "rejected_low_score": rejected_low_score,
    }

Ce qui compte le plus ici (en clair)

  • La recherche est déterministe et prévisible (facile à tester).
  • Context pack coupe le bruit et maintient les limites techniques pour une génération stable.

gateway.py — policy boundary pour le retrieval

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from retriever import build_context_pack, retrieve_candidates


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_query_chars: int = 240
    max_top_k: int = 6
    max_context_chunks: int = 3
    max_context_chars: int = 2200
    min_chunk_score: float = 0.2
    max_seconds: int = 20



def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_sources_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_intent:not_object")

    if raw.get("kind") != "retrieve":
        raise StopRun("invalid_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_intent:top_k")

    sources_raw = raw.get("sources")
    normalized_sources: list[str] = []
    if sources_raw is not None:
        if not isinstance(sources_raw, list) or not sources_raw:
            raise StopRun("invalid_intent:sources")
        for source in sources_raw:
            if not isinstance(source, str) or not source.strip():
                raise StopRun("invalid_intent:source_item")
            source_name = source.strip()
            if source_name not in allowed_sources_policy:
                raise StopRun(f"invalid_intent:source_not_allowed:{source_name}")
            normalized_sources.append(source_name)

    # Ignore unknown keys and keep only contract fields.
    payload = {
        "kind": "retrieve",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_sources:
        payload["sources"] = normalized_sources
    return payload


class RetrievalGateway:
    def __init__(
        self,
        *,
        documents: list[dict[str, Any]],
        budget: Budget,
        allow_execution_sources: set[str],
    ):
        self.documents = documents
        self.budget = budget
        self.allow_execution_sources = set(allow_execution_sources)

    def run(self, intent: dict[str, Any]) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_intent:query_too_long")

        requested_sources = set(intent.get("sources") or self.allow_execution_sources)
        denied = sorted(requested_sources - self.allow_execution_sources)
        if denied:
            raise StopRun(f"source_denied:{denied[0]}")

        candidates = retrieve_candidates(
            query=query,
            documents=self.documents,
            top_k=intent["top_k"],
            allowed_sources=requested_sources,
        )

        context_pack = build_context_pack(
            candidates=candidates,
            min_score=self.budget.min_chunk_score,
            max_chunks=self.budget.max_context_chunks,
            max_chars=self.budget.max_context_chars,
        )

        return {
            "query": query,
            "requested_sources": sorted(requested_sources),
            "candidates": candidates,
            "context_chunks": context_pack["chunks"],
            "context_total_chars": context_pack["total_chars"],
            "rejected_low_score": context_pack["rejected_low_score"],
        }

Ce qui compte le plus ici (en clair)

  • Gateway valide le contrat d’intent et bloque les sources non autorisées.
  • Les unknown keys sont ignorées si les champs required sont valides.
  • Gateway enforce uniquement l’execution allowlist transmise depuis main.py.

llm.py — retrieval planning + grounded answer

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


RETRIEVAL_SYSTEM_PROMPT = """
You are a retrieval planner for a RAG system.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve",
  "query": "short retrieval query",
  "top_k": 4
}

Optional key:
- "sources": ["support_policy", "security_policy"]

Rules:
- Use only sources from available_sources.
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Prefer omitting "sources" unless the question explicitly requires a specific policy domain.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are a support assistant.
Return exactly one JSON object with this shape:
{
  "answer": "grounded answer in English",
  "citations": ["doc_id_1", "doc_id_2"]
}

Rules:
- Use only facts from provided context_chunks.
- Keep the answer concise and actionable.
- Include at least one citation.
- All citations must be doc_ids from context_chunks.
- Do not output markdown or extra keys.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def plan_retrieval_intent(*, question: str, available_sources: list[str]) -> dict[str, Any]:
    payload = {
        "question": question,
        "available_sources": available_sources,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}



def compose_grounded_answer(
    *,
    question: str,
    context_chunks: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "question": question,
        "context_chunks": [
            {
                "doc_id": item.get("doc_id"),
                "title": item.get("title"),
                "section": item.get("section"),
                "updated_at": item.get("updated_at"),
                "text": item.get("text"),
            }
            for item in context_chunks
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    citations = data.get("citations")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(citations, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_citations: list[str] = []
    for item in citations:
        if not isinstance(item, str):
            raise LLMInvalid("llm_invalid_schema")
        value = item.strip()
        if value:
            normalized_citations.append(value)

    return {
        "answer": answer.strip(),
        "citations": normalized_citations,
    }

Ce qui compte le plus ici (en clair)

  • Le LLM planifie le retrieval séparément et compose la grounded answer séparément.
  • Les deux étapes passent par un contrat JSON, pas du texte libre.

main.py — Plan -> Retrieve -> Ground -> Answer

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import Budget, RetrievalGateway, StopRun, validate_retrieval_intent
from kb import KB_DOCUMENTS
from llm import LLMEmpty, LLMInvalid, LLMTimeout, compose_grounded_answer, plan_retrieval_intent

QUESTION = "What SLA applies to enterprise plan and what is P1 first response target?"

BUDGET = Budget(
    max_query_chars=240,
    max_top_k=6,
    max_context_chunks=3,
    max_context_chars=2200,
    min_chunk_score=0.2,
    max_seconds=20,
)

ALLOWED_SOURCES_POLICY = {
    "support_policy",
    "security_policy",
    "billing_policy",
}

SECURITY_SOURCE_RUNTIME_ENABLED = True
ALLOWED_SOURCES_EXECUTION = (
    {"support_policy", "security_policy", "billing_policy"}
    if SECURITY_SOURCE_RUNTIME_ENABLED
    else {"support_policy", "billing_policy"}
)
# Set SECURITY_SOURCE_RUNTIME_ENABLED=False to observe source_denied:security_policy.



def _shorten(text: str, *, limit: int = 280) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _validate_citations_from_context(
    context_chunks: list[dict[str, Any]],
    citations: list[str],
) -> tuple[list[str], list[dict[str, Any]], list[str], list[str]]:
    by_id: dict[str, dict[str, Any]] = {
        str(chunk["doc_id"]): chunk
        for chunk in context_chunks
        if chunk.get("doc_id")
    }

    normalized: list[str] = []
    seen: set[str] = set()
    for citation in citations:
        value = str(citation).strip()
        if not value or value in seen:
            continue
        seen.add(value)
        normalized.append(value)

    invalid = sorted([doc_id for doc_id in normalized if doc_id not in by_id])

    valid_doc_ids: list[str] = []
    citation_details: list[dict[str, Any]] = []
    for doc_id in normalized:
        chunk = by_id.get(doc_id)
        if not chunk:
            continue
        valid_doc_ids.append(doc_id)
        citation_details.append(
            {
                "doc_id": chunk["doc_id"],
                "title": chunk["title"],
                "section": chunk["section"],
                "updated_at": chunk["updated_at"],
                "source": chunk["source"],
                "score": chunk["score"],
            }
        )

    return valid_doc_ids, citation_details, invalid, sorted(by_id.keys())



def run_rag(question: str) -> dict[str, Any]:
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    gateway = RetrievalGateway(
        documents=KB_DOCUMENTS,
        budget=BUDGET,
        allow_execution_sources=ALLOWED_SOURCES_EXECUTION,
    )

    try:
        raw_intent = plan_retrieval_intent(
            question=question,
            available_sources=sorted(ALLOWED_SOURCES_POLICY),
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_sources_policy=ALLOWED_SOURCES_POLICY,
            max_top_k=BUDGET.max_top_k,
        )
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "plan",
            "raw_intent": raw_intent,
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "retrieve",
            "trace": trace,
            "history": history,
        }

    try:
        retrieval = gateway.run(intent)
    except StopRun as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.reason,
            "phase": "retrieve",
            "intent": intent,
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 1,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_sources": retrieval["requested_sources"],
            "candidates": len(retrieval["candidates"]),
            "context_chunks": len(retrieval["context_chunks"]),
            "rejected_low_score": retrieval["rejected_low_score"],
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "intent": intent,
            "retrieval": {
                "candidates": [
                    {
                        "doc_id": item["doc_id"],
                        "source": item["source"],
                        "score": item["score"],
                    }
                    for item in retrieval["candidates"]
                ],
                "context_chunks": [item["doc_id"] for item in retrieval["context_chunks"]],
            },
        }
    )

    if not retrieval["context_chunks"]:
        fallback_answer = (
            "I could not find enough grounded evidence in approved sources. "
            "Please clarify the plan (enterprise/standard) or provide a policy document link."
        )
        trace.append(
            {
                "step": 2,
                "phase": "fallback",
                "reason": "no_grounded_context",
                "ok": True,
            }
        )
        history.append(
            {
                "step": 2,
                "action": "fallback",
                "answer": fallback_answer,
            }
        )
        return {
            "status": "ok",
            "stop_reason": "success",
            "outcome": "clarify",
            "answer": fallback_answer,
            "citations": [],
            "citation_details": [],
            "trace": trace,
            "history": history,
        }

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return {
            "status": "stopped",
            "stop_reason": "max_seconds",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    try:
        final = compose_grounded_answer(
            question=question,
            context_chunks=retrieval["context_chunks"],
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMInvalid as exc:
        return {
            "status": "stopped",
            "stop_reason": exc.args[0],
            "phase": "generate",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    citations, citation_details, invalid_citations, context_doc_ids = _validate_citations_from_context(
        retrieval["context_chunks"],
        final["citations"],
    )
    if invalid_citations:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:citations_out_of_context",
            "phase": "generate",
            "invalid_citations": invalid_citations,
            "context_doc_ids": context_doc_ids,
            "trace": trace,
            "history": history,
        }
    if len(citations) < 1:
        return {
            "status": "stopped",
            "stop_reason": "invalid_answer:missing_citations",
            "phase": "generate",
            "trace": trace,
            "history": history,
        }

    trace.append(
        {
            "step": 2,
            "phase": "generate",
            "citation_count": len(citations),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "action": "compose_grounded_answer",
            "answer": _shorten(final["answer"]),
            "citations": citations,
        }
    )

    return {
        "status": "ok",
        "stop_reason": "success",
        "outcome": "grounded_answer",
        "answer": final["answer"],
        "citations": citations,
        "citation_details": citation_details,
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_rag(QUESTION)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en clair)

  • ALLOWED_SOURCES_POLICY et ALLOWED_SOURCES_EXECUTION sont définis dans main.py.
  • Gateway enforce l’execution allowlist et ne connaît pas le contexte policy au niveau métier.
  • _validate_citations_from_context(...) renvoie 4 valeurs : doc_id valides, citation_details, invalid_citations, context_doc_ids (pour debug d’un stop policy).
  • S’il n’y a pas de base factuelle, outcome="clarify" est renvoyé, pas une réponse inventée.

requirements.txt

TEXT
openai==2.21.0

Exemple de sortie

Ci-dessous, un exemple de grounded run réussi où l’agent répond uniquement à partir des documents trouvés.

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "outcome": "grounded_answer",
  "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
  "citations": ["doc_sla_enterprise_v3"],
  "citation_details": [
    {
      "doc_id": "doc_sla_enterprise_v3",
      "title": "Support Policy",
      "section": "Enterprise SLA",
      "updated_at": "2026-01-15",
      "source": "support_policy",
      "score": 1.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "retrieve",
      "query": "SLA for enterprise plan and P1 first response target",
      "requested_sources": ["support_policy"],
      "candidates": 2,
      "context_chunks": 2,
      "rejected_low_score": 0,
      "ok": true
    },
    {
      "step": 2,
      "phase": "generate",
      "citation_count": 1,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "intent": {"kind": "retrieve", "query": "SLA for enterprise plan and P1 first response target", "top_k": 4, "sources": ["support_policy"]},
      "retrieval": {
        "candidates": [
          {"doc_id": "doc_sla_enterprise_v3", "source": "support_policy", "score": 1.0},
          {"doc_id": "doc_sla_standard_v2", "source": "support_policy", "score": 1.0}
        ],
        "context_chunks": ["doc_sla_enterprise_v3", "doc_sla_standard_v2"]
      }
    },
    {
      "step": 2,
      "action": "compose_grounded_answer",
      "answer": "The Enterprise plan includes a 99.95% monthly uptime SLA. For P1 incidents, the first response target is 15 minutes, available 24/7.",
      "citations": ["doc_sla_enterprise_v3"]
    }
  ]
}

C’est un exemple raccourci : une partie des champs imbriqués est compactée en une ligne sans changer le sens.


Valeurs stop_reason typiques

  • success — run terminé correctement ; voir outcome (grounded_answer ou clarify)
  • invalid_intent:* — le retrieval intent du LLM n’a pas passé la validation policy
  • source_denied:<name> — source non autorisée par l’execution allowlist
  • llm_timeout — le LLM n’a pas répondu dans OPENAI_TIMEOUT_SECONDS
  • llm_empty — l’étape generate a renvoyé un answer vide
  • llm_invalid_json — l’étape generate a renvoyé un JSON invalide
  • llm_invalid_schema — le JSON ne correspond pas au schema attendu (answer/citations)
  • invalid_answer:missing_citations — la réponse n’est appuyée par aucune citation valide
  • invalid_answer:citations_out_of_context — la réponse contient des citations absentes des retrieval context chunks
  • max_seconds — budget total de temps du run dépassé

Ce qui n’est PAS montré ici

  • Pas d’index vectoriel/embeddings ni de recherche hybride.
  • Pas de multi-tenant auth/ACL au niveau document.
  • Pas de modèle reranker ni de déduplication sémantique.
  • Pas de mise à jour en ligne de l’index lors de changements dans la knowledge base.

Ce que vous pouvez essayer ensuite

  1. Mets SECURITY_SOURCE_RUNTIME_ENABLED=False et demande security_policy pour voir source_denied:*.
  2. Augmente min_chunk_score pour observer plus de outcome="clarify" sans hallucinations.
  3. Ajoute un post-check qui compare les chiffres clés de la réponse avec le texte des documents cités.
⏱️ 16 min de lectureMis à jour Mars, 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.
Auteur

Cette documentation est organisée et maintenue par des ingénieurs qui déploient des agents IA en production.

Le contenu est assisté par l’IA, avec une responsabilité éditoriale humaine quant à l’exactitude, la clarté et la pertinence en production.

Les patterns et recommandations s’appuient sur des post-mortems, des modes de défaillance et des incidents opérationnels dans des systèmes déployés, notamment lors du développement et de l’exploitation d’une infrastructure de gouvernance pour les agents chez OnceOnly.