Memory-Augmented Agent — Python (implementacion completa con LLM)

Ejemplo ejecutable de agente Memory-Augmented en Python con estilo de produccion, con flujo capture/store/retrieve/apply, allowlist de policy vs execution, ciclo de vida TTL de memoria y stop reasons explicitos.
En esta página
  1. Esencia del patron (breve)
  2. Que demuestra este ejemplo
  3. Arquitectura
  4. Estructura del proyecto
  5. Como ejecutar
  6. Tarea
  7. Solucion
  8. Codigo
  9. memory_store.py — memoria TTL y scoring de retrieval
  10. gateway.py — policy/execution boundary para memoria
  11. llm.py — extract, retrieve-intent, apply
  12. main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
  13. Ejemplo de salida
  14. Valores tipicos de stop_reason
  15. Que NO se muestra
  16. Que probar despues

Esencia del patron (breve)

Memory-Augmented Agent es un patron en el que el agente guarda hechos importantes entre sesiones y los usa en respuestas posteriores.

El LLM propone que recordar y que recuperar, y la capa memory policy/execution controla que realmente se puede escribir/leer.


Que demuestra este ejemplo

  • dos fases (Session 1/2) en una sola ejecucion; no se muestra persistencia entre procesos
  • extraccion de memoria via LLM en un contrato JSON (items[])
  • policy boundary para contratos de escritura/lectura de memoria
  • execution boundary (runtime allowlist) para claves y scopes de memoria
  • runtime trust-gating: execution puede bloquear claves sensibles aunque policy las permita
  • en esta demo, policy allowlist funciona como hard fail para detectar rapido drift y errores de contrato
  • ciclo de vida TTL de memoria y almacenamiento in-memory acotado
  • respuesta final con verificacion de used_memory_keys contra la memoria realmente recuperada
  • stop_reason, trace e history explicitos para monitoreo en produccion

Arquitectura

  1. El LLM extrae candidatos de memoria del mensaje del usuario (items).
  2. Policy boundary valida shape, claves, ttl_days y confidence.
  3. Execution boundary decide que registros se escriben realmente (runtime allowlist).
  4. En la siguiente sesion, el LLM planifica retrieval intent (kind/query/top_k/scopes).
  5. Gateway ejecuta retrieval solo en scopes permitidos por runtime.
  6. El LLM forma la respuesta con base en incident_context + memory_items.
  7. El sistema verifica que used_memory_keys referencien solo memoria realmente recuperada.

Contrato clave: el LLM puede proponer registros y retrieval intent, pero la capa policy/execution define que es valido y que se ejecuta realmente.

Policy allowlist define lo que el modelo puede pedir, y execution allowlist define lo que runtime realmente permite ahora.


Estructura del proyecto

TEXT
examples/
└── agent-patterns/
    └── memory-augmented-agent/
        └── python/
            ├── main.py           # Session1 capture/store -> Session2 retrieve/apply
            ├── llm.py            # extraction + retrieval planning + final response
            ├── gateway.py        # policy/execution boundary for memory operations
            ├── memory_store.py   # in-memory store with TTL and relevance scoring
            ├── requirements.txt
            └── README.md

Como ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Opcion con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Opcion con .env (opcional)
BASH
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE

set -a
source .env
set +a

python main.py

Esta es la variante de shell (macOS/Linux). En Windows es mas facil usar variables con set o, si quieres, python-dotenv para cargar .env automaticamente.


Tarea

Imagina un caso de operaciones para un incident assistant:

Session 1: el usuario define preferencias estables (idioma, estilo de respuesta, canal de actualizaciones).
Session 2: pide redactar un update corto sobre un incidente de pagos.

En Session 2, el goal incluye intencionalmente update y next actions para mostrar la aplicacion de campos de preferencias en un update estructurado.

El agente debe:

  • guardar solo hechos de memoria utiles
  • no guardar claves que runtime prohibe
  • recuperar memoria relevante en la siguiente sesion
  • aplicarla en la respuesta final

Solucion

En este ejemplo:

  • el LLM ejecuta extract_memory_candidates(...) y plan_retrieval_intent(...)
  • gateway valida contratos y enforce la execution allowlist
  • MemoryStore guarda registros con TTL y devuelve los hechos mas relevantes
  • ENABLE_PREFERENCE_BIAS es un switch de runtime para este flujo: no es "always include", sino un bias controlado (las preference keys reciben un pequeno bonus de score y pueden entrar en top_k)
  • con ENABLE_PREFERENCE_BIAS=True, las preference keys pueden entrar en top_k incluso sin token-overlap (por un bonus de score controlado)
  • la respuesta final pasa el allowlist-check: used_memory_keysretrieved_keys
  • la validacion de response_style=concise en esta demo es compliance de formato (longitud/cantidad de frases), no evaluacion semantica del tono
  • el resultado incluye trace completo y history compacto

Codigo

memory_store.py — memoria TTL y scoring de retrieval

PYTHON
from __future__ import annotations

import re
import time
from dataclasses import dataclass
from typing import Any


DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}


def _tokenize(text: str) -> set[str]:
    return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))


@dataclass
class MemoryRecord:
    user_id: int
    key: str
    value: str
    scope: str
    source: str
    confidence: float
    updated_at: float
    expires_at: float


class MemoryStore:
    def __init__(self, *, max_items: int):
        self.max_items = max_items
        self._records: dict[tuple[int, str, str], MemoryRecord] = {}

    def _evict_if_needed(self) -> None:
        if len(self._records) <= self.max_items:
            return
        oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
        self._records.pop(oldest_key, None)

    def upsert_items(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> list[dict[str, Any]]:
        now = time.time()
        written: list[dict[str, Any]] = []

        for item in items:
            key = str(item["key"]).strip()
            value = str(item["value"]).strip()
            scope = str(item.get("scope", "user")).strip() or "user"
            ttl_days = float(item.get("ttl_days", 180))
            ttl_days = max(1.0, min(365.0, ttl_days))
            confidence = float(item.get("confidence", 0.8))
            confidence = max(0.0, min(1.0, confidence))
            expires_at = now + ttl_days * 86400.0

            record_key = (user_id, scope, key)
            existing = self._records.get(record_key)
            if existing and existing.value == value:
                # Stable value: refresh metadata without creating noisy rewrites.
                existing.source = source
                existing.confidence = confidence
                existing.updated_at = now
                existing.expires_at = expires_at

                written.append(
                    {
                        "key": key,
                        "value": value,
                        "scope": scope,
                        "source": source,
                        "confidence": round(confidence, 3),
                        "ttl_days": int(ttl_days),
                        "refreshed": True,
                    }
                )
                continue

            row = MemoryRecord(
                user_id=user_id,
                key=key,
                value=value,
                scope=scope,
                source=source,
                confidence=confidence,
                updated_at=now,
                expires_at=expires_at,
            )
            self._records[record_key] = row
            self._evict_if_needed()

            written.append(
                {
                    "key": key,
                    "value": value,
                    "scope": scope,
                    "source": source,
                    "confidence": round(confidence, 3),
                    "ttl_days": int(ttl_days),
                    "refreshed": False,
                }
            )

        return written

    def search(
        self,
        *,
        user_id: int,
        query: str,
        top_k: int,
        scopes: set[str],
        include_preference_keys: bool = False,
    ) -> list[dict[str, Any]]:
        now = time.time()
        query_tokens = _tokenize(query)
        if not query_tokens:
            return []

        hits: list[tuple[float, MemoryRecord]] = []
        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.scope not in scopes:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue

            text_tokens = _tokenize(f"{row.key} {row.value}")
            overlap = len(query_tokens & text_tokens)
            if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
                continue

            score = overlap + (row.confidence * 0.3)

            if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
                score += 0.4

            if score <= 0:
                continue
            hits.append((score, row))

        hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)

        result: list[dict[str, Any]] = []
        for score, row in hits[:top_k]:
            result.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "score": round(score, 3),
                }
            )
        return result

    def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
        now = time.time()
        rows: list[MemoryRecord] = []

        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue
            rows.append(row)

        rows.sort(key=lambda item: item.updated_at, reverse=True)

        snapshot: list[dict[str, Any]] = []
        for row in rows:
            ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
            snapshot.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "ttl_left_days": round(ttl_left_days, 1),
                }
            )
        return snapshot

Lo mas importante aqui (en simple)

  • La memoria esta aislada por user_id y scope.
  • Hay ciclo de vida: TTL + limpieza de registros expirados.
  • El boost para preference keys se controla por runtime policy (include_preference_keys), no por el wording de retrieval query.
  • search(...) devuelve memory items relevantes, no todo el state.

gateway.py — policy/execution boundary para memoria

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from memory_store import MemoryStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_capture_items: int = 6
    max_retrieve_top_k: int = 6
    max_query_chars: int = 240
    max_answer_chars: int = 700
    max_value_chars: int = 120
    max_seconds: int = 25


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def validate_memory_candidates(
    raw: Any,
    *,
    allowed_keys_policy: set[str],
    allowed_scopes_policy: set[str],
    max_items: int,
    max_value_chars: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_memory_candidates:not_object")

    items = raw.get("items")
    if not isinstance(items, list):
        raise StopRun("invalid_memory_candidates:items")

    normalized: list[dict[str, Any]] = []
    for item in items:
        if not isinstance(item, dict):
            raise StopRun("invalid_memory_candidates:item")

        required_keys = {"key", "value"}
        if not required_keys.issubset(item.keys()):
            raise StopRun("invalid_memory_candidates:missing_keys")

        key = item.get("key")
        value = item.get("value")
        scope = item.get("scope", "user")
        ttl_days = item.get("ttl_days", 180)
        confidence = item.get("confidence", 0.8)

        if not isinstance(key, str) or not key.strip():
            raise StopRun("invalid_memory_candidates:key")
        key = key.strip()
        if key not in allowed_keys_policy:
            raise StopRun(f"memory_key_not_allowed_policy:{key}")

        if not isinstance(value, str) or not value.strip():
            raise StopRun("invalid_memory_candidates:value")
        value = value.strip()
        if len(value) > max_value_chars:
            raise StopRun("invalid_memory_candidates:value_too_long")

        if not isinstance(scope, str) or not scope.strip():
            raise StopRun("invalid_memory_candidates:scope")
        scope = scope.strip()
        if scope not in allowed_scopes_policy:
            raise StopRun(f"memory_scope_not_allowed_policy:{scope}")

        if not _is_number(ttl_days):
            raise StopRun("invalid_memory_candidates:ttl_days")
        ttl_days = int(float(ttl_days))
        ttl_days = max(1, min(365, ttl_days))

        if not _is_number(confidence):
            raise StopRun("invalid_memory_candidates:confidence")
        confidence = float(confidence)
        confidence = max(0.0, min(1.0, confidence))

        normalized.append(
            {
                "key": key,
                "value": value,
                "scope": scope,
                "ttl_days": ttl_days,
                "confidence": round(confidence, 3),
            }
        )

    if len(normalized) > max_items:
        raise StopRun("invalid_memory_candidates:too_many_items")

    return {"items": normalized}


def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_scopes_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_retrieval_intent:not_object")

    if raw.get("kind") != "retrieve_memory":
        raise StopRun("invalid_retrieval_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_retrieval_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_retrieval_intent:top_k")

    scopes_raw = raw.get("scopes")
    normalized_scopes: list[str] = []
    if scopes_raw is not None:
        if not isinstance(scopes_raw, list) or not scopes_raw:
            raise StopRun("invalid_retrieval_intent:scopes")
        for scope in scopes_raw:
            if not isinstance(scope, str) or not scope.strip():
                raise StopRun("invalid_retrieval_intent:scope_item")
            normalized_scope = scope.strip()
            if normalized_scope not in allowed_scopes_policy:
                raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
            normalized_scopes.append(normalized_scope)

    payload = {
        "kind": "retrieve_memory",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_scopes:
        payload["scopes"] = normalized_scopes
    return payload


class MemoryGateway:
    def __init__(
        self,
        *,
        store: MemoryStore,
        budget: Budget,
        allow_execution_keys: set[str],
        allow_execution_scopes: set[str],
    ):
        self.store = store
        self.budget = budget
        self.allow_execution_keys = set(allow_execution_keys)
        self.allow_execution_scopes = set(allow_execution_scopes)

    def write(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> dict[str, Any]:
        if len(items) > self.budget.max_capture_items:
            raise StopRun("max_capture_items")

        writable: list[dict[str, Any]] = []
        blocked: list[dict[str, Any]] = []

        for item in items:
            key = item["key"]
            scope = item["scope"]

            if key not in self.allow_execution_keys:
                blocked.append({"key": key, "reason": "key_denied_execution"})
                continue
            if scope not in self.allow_execution_scopes:
                blocked.append(
                    {
                        "key": key,
                        "scope": scope,
                        "reason": "scope_denied_execution",
                    }
                )
                continue

            writable.append(item)

        written = []
        if writable:
            written = self.store.upsert_items(user_id=user_id, items=writable, source=source)

        return {
            "written": written,
            "blocked": blocked,
        }

    def retrieve(
        self,
        *,
        user_id: int,
        intent: dict[str, Any],
        include_preference_keys: bool = False,
    ) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_retrieval_intent:query_too_long")

        requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
        denied = sorted(requested_scopes - self.allow_execution_scopes)
        if denied:
            raise StopRun(f"scope_denied:{denied[0]}")

        items = self.store.search(
            user_id=user_id,
            query=query,
            top_k=intent["top_k"],
            scopes=requested_scopes,
            include_preference_keys=include_preference_keys,
        )

        return {
            "query": query,
            "requested_scopes": sorted(requested_scopes),
            "include_preference_keys": include_preference_keys,
            "items": items,
        }

Lo mas importante aqui (en simple)

  • Policy boundary verifica el contrato y las claves/scopes permitidos.
  • Policy es estricta: memory key/scope fuera de allowlist detiene el run.
  • Gateway solo enforce la execution allowlist que llega desde main.py.
  • Si key/scope estan prohibidos en runtime, la escritura se bloquea y se ve en history.blocked.

llm.py — extract, retrieve-intent, apply

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
  "items": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "ttl_days": 180,
      "confidence": 0.9
    }
  ]
}

Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()

RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve_memory",
  "query": "short memory query",
  "top_k": 4
}

Optional key:
- "scopes": ["user", "workspace"]

Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
  "answer": "final answer in English",
  "used_memory_keys": ["language", "response_style"]
}

Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def extract_memory_candidates(
    *,
    user_message: str,
    available_keys: list[str],
) -> dict[str, Any]:
    payload = {
        "user_message": user_message,
        "available_keys": available_keys,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}


def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "available_scopes": available_scopes,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_memory_augmented_answer(
    *,
    goal: str,
    incident_context: dict[str, Any],
    memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "memory_items": [
            {
                "key": item.get("key"),
                "value": item.get("value"),
                "scope": item.get("scope"),
                "confidence": item.get("confidence"),
            }
            for item in memory_items
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    used_memory_keys = data.get("used_memory_keys")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(used_memory_keys, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_keys: list[str] = []
    for value in used_memory_keys:
        if not isinstance(value, str):
            raise LLMInvalid("llm_invalid_schema")
        item = value.strip()
        if item and item not in normalized_keys:
            normalized_keys.append(item)

    return {
        "answer": answer.strip(),
        "used_memory_keys": normalized_keys,
    }

Lo mas importante aqui (en simple)

  • Cada fase tiene su propio contrato JSON: capture, retrieve_intent, apply.
  • Los errores del LLM se separan en llm_timeout, llm_invalid_*, llm_empty.

main.py — Session1 Capture/Store -> Session2 Retrieve/Apply

PYTHON
from __future__ import annotations

import json
import re
import time
import uuid
from typing import Any

from gateway import (
    Budget,
    MemoryGateway,
    StopRun,
    validate_memory_candidates,
    validate_retrieval_intent,
)
from llm import (
    LLMEmpty,
    LLMInvalid,
    LLMTimeout,
    compose_memory_augmented_answer,
    extract_memory_candidates,
    plan_retrieval_intent,
)
from memory_store import MemoryStore

USER_ID = 42
SESSION_1_USER_MESSAGE = (
    "For future incident updates, write in English, keep replies concise, "
    "use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."

INCIDENT_CONTEXT = {
    "date": "2026-03-04",
    "region": "US",
    "incident_id": "inc_payments_20260304",
    "severity": "P1",
    "gateway_status": "degraded",
    "failed_payment_rate": 0.034,
    "chargeback_alerts": 5,
    "eta_minutes": 45,
}

BUDGET = Budget(
    max_capture_items=6,
    max_retrieve_top_k=6,
    max_query_chars=240,
    max_answer_chars=700,
    max_value_chars=120,
    max_seconds=25,
)

ALLOWED_MEMORY_KEYS_POLICY = {
    "language",
    "response_style",
    "update_channel",
    "declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
    ALLOWED_MEMORY_KEYS_POLICY
    if TRUST_DECLARED_TIER_FROM_CHAT
    else {"language", "response_style", "update_channel"}
)

ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
    ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True



def _shorten(text: str, *, limit: int = 240) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _pick_applied_memory(
    memory_items: list[dict[str, Any]],
    used_keys: list[str],
) -> list[dict[str, Any]]:
    used = set(used_keys)
    out: list[dict[str, Any]] = []
    for item in memory_items:
        key = item.get("key")
        if key not in used:
            continue
        out.append(
            {
                "key": item["key"],
                "value": item["value"],
                "scope": item["scope"],
                "confidence": item["confidence"],
                "score": item["score"],
            }
        )
    return out



def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
    """
    Conservative audit check:
    - update_channel: value should appear in answer text.
    - response_style=concise: short-response format compliance.
    - language=english: answer should use a stable prefix "Incident update:".
    If no verifiable key exists, do not block.
    """
    if not applied_memory:
        return False

    normalized_answer = " ".join((answer or "").lower().split())
    evidenced_any = False
    has_verifiable_key = False

    def _is_concise(text: str) -> bool:
        words = re.findall(r"[a-zA-Z0-9_]+", text)
        sentence_count = len(re.findall(r"[.!?]+", text))
        return len(words) <= 80 and sentence_count <= 3

    for item in applied_memory:
        key = str(item.get("key") or "").strip().lower()
        value = str(item.get("value", "")).strip().lower()
        if not key or not value:
            continue

        if key == "update_channel":
            has_verifiable_key = True
            if value in normalized_answer:
                evidenced_any = True
        elif key == "response_style":
            has_verifiable_key = True
            if value == "concise" and _is_concise(answer):
                evidenced_any = True
        elif key == "language":
            if value in {"english", "en"}:
                has_verifiable_key = True
                if normalized_answer.startswith("incident update:"):
                    evidenced_any = True
            continue
        else:
            continue

    if not has_verifiable_key:
        return True
    return evidenced_any



def run_memory_augmented(
    *,
    user_id: int,
    session_1_message: str,
    session_2_goal: str,
) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    store = MemoryStore(max_items=100)
    gateway = MemoryGateway(
        store=store,
        budget=BUDGET,
        allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
        allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
    )

    try:
        raw_capture = extract_memory_candidates(
            user_message=session_1_message,
            available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="capture")

    try:
        capture_payload = validate_memory_candidates(
            raw_capture,
            allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_items=BUDGET.max_capture_items,
            max_value_chars=BUDGET.max_value_chars,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="capture",
            raw_capture=raw_capture,
        )

    write_result = gateway.write(
        user_id=user_id,
        items=capture_payload["items"],
        source="session_1",
    )

    refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
    written_items = [item for item in write_result["written"] if not item.get("refreshed")]

    trace.append(
        {
            "step": 1,
            "phase": "capture_store",
            "candidates": len(capture_payload["items"]),
            "written": len(written_items),
            "refreshed": len(refreshed_items),
            "blocked": len(write_result["blocked"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "session": "session_1",
            "message": session_1_message,
            "written_keys": [item["key"] for item in written_items],
            "refreshed_keys": [item["key"] for item in refreshed_items],
            "blocked": write_result["blocked"],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="retrieve_plan")

    try:
        raw_intent = plan_retrieval_intent(
            goal=session_2_goal,
            available_scopes=sorted(ALLOWED_SCOPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="retrieve_plan")

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_top_k=BUDGET.max_retrieve_top_k,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve_plan",
            raw_intent=raw_intent,
        )

    try:
        retrieval = gateway.retrieve(
            user_id=user_id,
            intent=intent,
            include_preference_keys=ENABLE_PREFERENCE_BIAS,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve",
            intent=intent,
        )

    trace.append(
        {
            "step": 2,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "memory_hits": len(retrieval["items"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "session": "session_2",
            "intent": intent,
            "resolved_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "retrieved_keys": [item["key"] for item in retrieval["items"]],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="apply")

    try:
        final = compose_memory_augmented_answer(
            goal=session_2_goal,
            incident_context=INCIDENT_CONTEXT,
            memory_items=retrieval["items"],
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="apply")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="apply")
    except LLMEmpty:
        return stopped("llm_empty", phase="apply")

    retrieved_keys = {item["key"] for item in retrieval["items"]}
    invalid_used_keys = sorted(
        set(final["used_memory_keys"]) - retrieved_keys,
    )
    if invalid_used_keys:
        return stopped(
            "invalid_answer:memory_keys_out_of_context",
            phase="apply",
            invalid_used_memory_keys=invalid_used_keys,
            retrieved_keys=sorted(retrieved_keys),
        )

    if len(final["answer"]) > BUDGET.max_answer_chars:
        return stopped("invalid_answer:too_long", phase="apply")

    applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
    if final["used_memory_keys"] and not _has_declared_memory_application(
        answer=final["answer"],
        applied_memory=applied_memory,
    ):
        return stopped(
            "invalid_answer:memory_declared_but_not_applied",
            phase="apply",
            used_memory_keys=final["used_memory_keys"],
            applied_memory=applied_memory,
        )

    trace.append(
        {
            "step": 3,
            "phase": "apply",
            "used_memory_keys": final["used_memory_keys"],
            "applied_memory_count": len(applied_memory),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 3,
            "action": "compose_memory_augmented_answer",
            "used_memory_keys": final["used_memory_keys"],
            "answer": _shorten(final["answer"]),
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
        "answer": final["answer"],
        "used_memory_keys": final["used_memory_keys"],
        "applied_memory": applied_memory,
        "memory_state": store.dump_user_records(user_id=user_id),
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_memory_augmented(
        user_id=USER_ID,
        session_1_message=SESSION_1_USER_MESSAGE,
        session_2_goal=SESSION_2_GOAL,
    )
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo mas importante aqui (en simple)

  • Session 1 y Session 2 se simulan aqui dentro de un solo process run mediante un memory store compartido.
  • ALLOWED_MEMORY_KEYS_POLICY y ALLOWED_MEMORY_KEYS_EXECUTION pueden diferir intencionalmente.
  • ENABLE_PREFERENCE_BIAS=True se activa solo para el flujo de incident-update, donde los campos de preferencias casi siempre se necesitan.
  • Cada payload de resultado incluye run_id para correlacion de logs.
  • La verificacion final de used_memory_keys + memory_declared_but_not_applied hace que el paso apply sea audit-friendly.

Ejemplo de salida

JSON
{
  "run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "memory_applied",
  "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
  "used_memory_keys": [
    "language",
    "update_channel",
    "response_style"
  ],
  "applied_memory": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "confidence": 0.9,
      "score": 0.67
    }
  ],
  "memory_state": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.9,
      "ttl_left_days": 180.0
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "capture_store",
      "candidates": 4,
      "written": 3,
      "refreshed": 0,
      "blocked": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "retrieve",
      "query": "payment incident update and next actions",
      "requested_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "memory_hits": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "apply",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "applied_memory_count": 3,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "session": "session_1",
      "message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
      "written_keys": [
        "language",
        "response_style",
        "update_channel"
      ],
      "refreshed_keys": [],
      "blocked": [
        {
          "key": "declared_tier",
          "reason": "key_denied_execution"
        }
      ]
    },
    {
      "step": 2,
      "session": "session_2",
      "intent": {
        "kind": "retrieve_memory",
        "query": "payment incident update and next actions",
        "top_k": 4
      },
      "resolved_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "retrieved_keys": [
        "language",
        "update_channel",
        "response_style"
      ]
    },
    {
      "step": 3,
      "action": "compose_memory_augmented_answer",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
    }
  ]
}

Valores tipicos de stop_reason

  • success — el run finalizo correctamente; ver outcome (memory_applied o context_only)
  • invalid_memory_candidates:* — memory capture no paso la validacion de contrato
  • invalid_memory_candidates:value_too_long — value supera el limite max_value_chars
  • memory_key_not_allowed_policy:<key> — el LLM propuso una key fuera de policy allowlist
  • memory_scope_not_allowed_policy:<scope> — el LLM propuso un scope fuera de policy allowlist
  • invalid_retrieval_intent:* — retrieval intent no paso policy validation
  • scope_denied:<scope> — el retrieval scope no esta permitido por execution allowlist
  • llm_timeout — el LLM no respondio dentro de OPENAI_TIMEOUT_SECONDS
  • llm_invalid_json / llm_invalid_schema — el paso apply devolvio JSON/shape invalido
  • llm_empty — respuesta final vacia
  • invalid_answer:memory_keys_out_of_context — el modelo referencio una memory key que no estaba en retrieval
  • invalid_answer:too_long — la respuesta final supera el limite max_answer_chars
  • invalid_answer:memory_declared_but_not_applied — el modelo declaro uso de memoria, pero el texto de respuesta no lo refleja
  • max_seconds — se excedio el presupuesto total de tiempo del run

Que NO se muestra

  • almacenamiento persistente (Postgres/Redis/Vector DB) en lugar de implementacion in-memory
  • cifrado/PII redaction antes de escribir memoria
  • retrieval semantico via embeddings (en lugar de token-overlap simple)
  • quotas multi-tenant y soft/hard retention policy
  • retry/backoff para llamadas LLM
  • per-key consent y memory UI visible para usuario

Que probar despues

  1. Activa TRUST_DECLARED_TIER_FROM_CHAT=True y revisa como cambia blocked en history.
  2. Activa WORKSPACE_MEMORY_RUNTIME_ENABLED=True y agrega retrieval intent con scope workspace.
  3. Agrega una policy rule para la memory key timezone y valida personalizacion en la answer.
  4. Reemplaza el in-memory store por almacenamiento externo y agrega deduplicacion por key + normalized_value.
⏱️ 20 min de lecturaActualizado Mar, 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.
Autor

Esta documentación está curada y mantenida por ingenieros que despliegan agentes de IA en producción.

El contenido es asistido por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

Los patrones y las recomendaciones se basan en post-mortems, modos de fallo e incidentes operativos en sistemas desplegados, incluido durante el desarrollo y la operación de infraestructura de gobernanza para agentes en OnceOnly.