Memory-Augmented Agent — Python (full implementation with LLM)

Production-style runnable Memory-Augmented agent example in Python with capture/store/retrieve/apply flow, policy vs execution allowlist, TTL memory lifecycle, and explicit stop reasons.
On this page
  1. Pattern Essence (Brief)
  2. What this example demonstrates
  3. Architecture
  4. Project structure
  5. How to run
  6. Task
  7. Solution
  8. Code
  9. memory_store.py — TTL memory and retrieval scoring
  10. gateway.py — policy/execution boundary for memory
  11. llm.py — extract, retrieve-intent, apply
  12. main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
  13. Example output
  14. Typical stop_reason values
  15. What is NOT shown
  16. What to try next

Pattern Essence (Brief)

Memory-Augmented Agent is a pattern where the agent stores important facts across sessions and uses them in later responses.

LLM proposes what to remember and what to retrieve, while the memory policy/execution layer controls what can actually be written/read.


What this example demonstrates

  • two phases (Session 1/2) in one run; persistence between processes is not shown
  • memory extraction through LLM in a JSON contract (items[])
  • policy boundary for memory write/retrieve contracts
  • execution boundary (runtime allowlist) for memory keys and scopes
  • runtime trust-gating: execution can block sensitive keys even if policy allows them
  • in this demo, policy allowlist works as a hard fail to quickly detect drift and contract errors
  • TTL memory lifecycle and bounded in-memory store
  • final answer with used_memory_keys validation against actually retrieved memory
  • explicit stop_reason, trace, history for production monitoring

Architecture

  1. LLM extracts memory candidates from the user message (items).
  2. Policy boundary validates shape, keys, ttl_days, and confidence.
  3. Execution boundary decides which records are actually written (runtime allowlist).
  4. In the next session, LLM plans retrieval intent (kind/query/top_k/scopes).
  5. Gateway executes retrieval only in runtime-allowed scopes.
  6. LLM forms the answer based on incident_context + memory_items.
  7. The system checks that used_memory_keys reference only actually retrieved memory.

Key contract: LLM may propose records and retrieval intent, but the policy/execution layer defines what is valid and what is actually executed.

Policy allowlist defines what the model may ask for, and execution allowlist defines what runtime actually allows right now.


Project structure

TEXT
examples/
└── agent-patterns/
    └── memory-augmented-agent/
        └── python/
            ├── main.py           # Session1 capture/store -> Session2 retrieve/apply
            ├── llm.py            # extraction + retrieval planning + final response
            ├── gateway.py        # policy/execution boundary for memory operations
            ├── memory_store.py   # in-memory store with TTL and relevance scoring
            ├── requirements.txt
            └── README.md

How to run

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ is required.

Option via export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Option via .env (optional)
BASH
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE

set -a
source .env
set +a

python main.py

This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.


Task

Imagine an operations case for an incident assistant:

Session 1: the user sets stable preferences (language, answer style, update channel).
Session 2: asks to compose a short update about a payment incident.

In Session 2, the goal intentionally includes both update and next actions to show how preference fields are applied in a structured update.

The agent must:

  • store only useful memory facts
  • not store keys that runtime forbids
  • retrieve relevant memory in the next session
  • apply it in the final answer

Solution

In this example:

  • LLM performs extract_memory_candidates(...) and plan_retrieval_intent(...)
  • gateway validates contracts and enforces execution allowlist
  • MemoryStore stores records with TTL and returns top relevant facts
  • ENABLE_PREFERENCE_BIAS is a runtime switch for this flow: this is not "always include", but controlled bias (preference keys get a small scoring bonus and can enter top_k)
  • with ENABLE_PREFERENCE_BIAS=True, preference keys can enter top_k even without token overlap (through controlled score bonus)
  • final answer passes allowlist check: used_memory_keysretrieved_keys
  • response_style=concise validation in this demo is formatting compliance (length/sentence count), not semantic tone evaluation
  • result includes full trace and compact history

Code

memory_store.py — TTL memory and retrieval scoring

PYTHON
from __future__ import annotations

import re
import time
from dataclasses import dataclass
from typing import Any


DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}


def _tokenize(text: str) -> set[str]:
    return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))


@dataclass
class MemoryRecord:
    user_id: int
    key: str
    value: str
    scope: str
    source: str
    confidence: float
    updated_at: float
    expires_at: float


class MemoryStore:
    def __init__(self, *, max_items: int):
        self.max_items = max_items
        self._records: dict[tuple[int, str, str], MemoryRecord] = {}

    def _evict_if_needed(self) -> None:
        if len(self._records) <= self.max_items:
            return
        oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
        self._records.pop(oldest_key, None)

    def upsert_items(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> list[dict[str, Any]]:
        now = time.time()
        written: list[dict[str, Any]] = []

        for item in items:
            key = str(item["key"]).strip()
            value = str(item["value"]).strip()
            scope = str(item.get("scope", "user")).strip() or "user"
            ttl_days = float(item.get("ttl_days", 180))
            ttl_days = max(1.0, min(365.0, ttl_days))
            confidence = float(item.get("confidence", 0.8))
            confidence = max(0.0, min(1.0, confidence))
            expires_at = now + ttl_days * 86400.0

            record_key = (user_id, scope, key)
            existing = self._records.get(record_key)
            if existing and existing.value == value:
                # Stable value: refresh metadata without creating noisy rewrites.
                existing.source = source
                existing.confidence = confidence
                existing.updated_at = now
                existing.expires_at = expires_at

                written.append(
                    {
                        "key": key,
                        "value": value,
                        "scope": scope,
                        "source": source,
                        "confidence": round(confidence, 3),
                        "ttl_days": int(ttl_days),
                        "refreshed": True,
                    }
                )
                continue

            row = MemoryRecord(
                user_id=user_id,
                key=key,
                value=value,
                scope=scope,
                source=source,
                confidence=confidence,
                updated_at=now,
                expires_at=expires_at,
            )
            self._records[record_key] = row
            self._evict_if_needed()

            written.append(
                {
                    "key": key,
                    "value": value,
                    "scope": scope,
                    "source": source,
                    "confidence": round(confidence, 3),
                    "ttl_days": int(ttl_days),
                    "refreshed": False,
                }
            )

        return written

    def search(
        self,
        *,
        user_id: int,
        query: str,
        top_k: int,
        scopes: set[str],
        include_preference_keys: bool = False,
    ) -> list[dict[str, Any]]:
        now = time.time()
        query_tokens = _tokenize(query)
        if not query_tokens:
            return []

        hits: list[tuple[float, MemoryRecord]] = []
        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.scope not in scopes:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue

            text_tokens = _tokenize(f"{row.key} {row.value}")
            overlap = len(query_tokens & text_tokens)
            if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
                continue

            score = overlap + (row.confidence * 0.3)

            if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
                score += 0.4

            if score <= 0:
                continue
            hits.append((score, row))

        hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)

        result: list[dict[str, Any]] = []
        for score, row in hits[:top_k]:
            result.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "score": round(score, 3),
                }
            )
        return result

    def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
        now = time.time()
        rows: list[MemoryRecord] = []

        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue
            rows.append(row)

        rows.sort(key=lambda item: item.updated_at, reverse=True)

        snapshot: list[dict[str, Any]] = []
        for row in rows:
            ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
            snapshot.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "ttl_left_days": round(ttl_left_days, 1),
                }
            )
        return snapshot

What matters most here (in plain words)

  • Memory is isolated by user_id and scope.
  • There is lifecycle support: TTL + cleanup of expired records.
  • Boost for preference keys is controlled by runtime policy (include_preference_keys), not by retrieval query wording.
  • search(...) returns relevant memory items, not the whole state.

gateway.py — policy/execution boundary for memory

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from memory_store import MemoryStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_capture_items: int = 6
    max_retrieve_top_k: int = 6
    max_query_chars: int = 240
    max_answer_chars: int = 700
    max_value_chars: int = 120
    max_seconds: int = 25


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def validate_memory_candidates(
    raw: Any,
    *,
    allowed_keys_policy: set[str],
    allowed_scopes_policy: set[str],
    max_items: int,
    max_value_chars: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_memory_candidates:not_object")

    items = raw.get("items")
    if not isinstance(items, list):
        raise StopRun("invalid_memory_candidates:items")

    normalized: list[dict[str, Any]] = []
    for item in items:
        if not isinstance(item, dict):
            raise StopRun("invalid_memory_candidates:item")

        required_keys = {"key", "value"}
        if not required_keys.issubset(item.keys()):
            raise StopRun("invalid_memory_candidates:missing_keys")

        key = item.get("key")
        value = item.get("value")
        scope = item.get("scope", "user")
        ttl_days = item.get("ttl_days", 180)
        confidence = item.get("confidence", 0.8)

        if not isinstance(key, str) or not key.strip():
            raise StopRun("invalid_memory_candidates:key")
        key = key.strip()
        if key not in allowed_keys_policy:
            raise StopRun(f"memory_key_not_allowed_policy:{key}")

        if not isinstance(value, str) or not value.strip():
            raise StopRun("invalid_memory_candidates:value")
        value = value.strip()
        if len(value) > max_value_chars:
            raise StopRun("invalid_memory_candidates:value_too_long")

        if not isinstance(scope, str) or not scope.strip():
            raise StopRun("invalid_memory_candidates:scope")
        scope = scope.strip()
        if scope not in allowed_scopes_policy:
            raise StopRun(f"memory_scope_not_allowed_policy:{scope}")

        if not _is_number(ttl_days):
            raise StopRun("invalid_memory_candidates:ttl_days")
        ttl_days = int(float(ttl_days))
        ttl_days = max(1, min(365, ttl_days))

        if not _is_number(confidence):
            raise StopRun("invalid_memory_candidates:confidence")
        confidence = float(confidence)
        confidence = max(0.0, min(1.0, confidence))

        normalized.append(
            {
                "key": key,
                "value": value,
                "scope": scope,
                "ttl_days": ttl_days,
                "confidence": round(confidence, 3),
            }
        )

    if len(normalized) > max_items:
        raise StopRun("invalid_memory_candidates:too_many_items")

    return {"items": normalized}


def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_scopes_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_retrieval_intent:not_object")

    if raw.get("kind") != "retrieve_memory":
        raise StopRun("invalid_retrieval_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_retrieval_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_retrieval_intent:top_k")

    scopes_raw = raw.get("scopes")
    normalized_scopes: list[str] = []
    if scopes_raw is not None:
        if not isinstance(scopes_raw, list) or not scopes_raw:
            raise StopRun("invalid_retrieval_intent:scopes")
        for scope in scopes_raw:
            if not isinstance(scope, str) or not scope.strip():
                raise StopRun("invalid_retrieval_intent:scope_item")
            normalized_scope = scope.strip()
            if normalized_scope not in allowed_scopes_policy:
                raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
            normalized_scopes.append(normalized_scope)

    payload = {
        "kind": "retrieve_memory",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_scopes:
        payload["scopes"] = normalized_scopes
    return payload


class MemoryGateway:
    def __init__(
        self,
        *,
        store: MemoryStore,
        budget: Budget,
        allow_execution_keys: set[str],
        allow_execution_scopes: set[str],
    ):
        self.store = store
        self.budget = budget
        self.allow_execution_keys = set(allow_execution_keys)
        self.allow_execution_scopes = set(allow_execution_scopes)

    def write(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> dict[str, Any]:
        if len(items) > self.budget.max_capture_items:
            raise StopRun("max_capture_items")

        writable: list[dict[str, Any]] = []
        blocked: list[dict[str, Any]] = []

        for item in items:
            key = item["key"]
            scope = item["scope"]

            if key not in self.allow_execution_keys:
                blocked.append({"key": key, "reason": "key_denied_execution"})
                continue
            if scope not in self.allow_execution_scopes:
                blocked.append(
                    {
                        "key": key,
                        "scope": scope,
                        "reason": "scope_denied_execution",
                    }
                )
                continue

            writable.append(item)

        written = []
        if writable:
            written = self.store.upsert_items(user_id=user_id, items=writable, source=source)

        return {
            "written": written,
            "blocked": blocked,
        }

    def retrieve(
        self,
        *,
        user_id: int,
        intent: dict[str, Any],
        include_preference_keys: bool = False,
    ) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_retrieval_intent:query_too_long")

        requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
        denied = sorted(requested_scopes - self.allow_execution_scopes)
        if denied:
            raise StopRun(f"scope_denied:{denied[0]}")

        items = self.store.search(
            user_id=user_id,
            query=query,
            top_k=intent["top_k"],
            scopes=requested_scopes,
            include_preference_keys=include_preference_keys,
        )

        return {
            "query": query,
            "requested_scopes": sorted(requested_scopes),
            "include_preference_keys": include_preference_keys,
            "items": items,
        }

What matters most here (in plain words)

  • Policy boundary checks contract and allowed keys/scopes.
  • Policy is strict: memory key/scope outside allowlist stops the run.
  • Gateway only enforces execution allowlist coming from main.py.
  • If key/scope are runtime-denied, write is blocked and this is visible in history.blocked.

llm.py — extract, retrieve-intent, apply

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
  "items": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "ttl_days": 180,
      "confidence": 0.9
    }
  ]
}

Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()

RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve_memory",
  "query": "short memory query",
  "top_k": 4
}

Optional key:
- "scopes": ["user", "workspace"]

Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
  "answer": "final answer in English",
  "used_memory_keys": ["language", "response_style"]
}

Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def extract_memory_candidates(
    *,
    user_message: str,
    available_keys: list[str],
) -> dict[str, Any]:
    payload = {
        "user_message": user_message,
        "available_keys": available_keys,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}


def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "available_scopes": available_scopes,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_memory_augmented_answer(
    *,
    goal: str,
    incident_context: dict[str, Any],
    memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "memory_items": [
            {
                "key": item.get("key"),
                "value": item.get("value"),
                "scope": item.get("scope"),
                "confidence": item.get("confidence"),
            }
            for item in memory_items
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    used_memory_keys = data.get("used_memory_keys")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(used_memory_keys, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_keys: list[str] = []
    for value in used_memory_keys:
        if not isinstance(value, str):
            raise LLMInvalid("llm_invalid_schema")
        item = value.strip()
        if item and item not in normalized_keys:
            normalized_keys.append(item)

    return {
        "answer": answer.strip(),
        "used_memory_keys": normalized_keys,
    }

What matters most here (in plain words)

  • Each phase has a separate JSON contract: capture, retrieve_intent, apply.
  • LLM errors are separated into llm_timeout, llm_invalid_*, llm_empty.

main.py — Session1 Capture/Store -> Session2 Retrieve/Apply

PYTHON
from __future__ import annotations

import json
import re
import time
import uuid
from typing import Any

from gateway import (
    Budget,
    MemoryGateway,
    StopRun,
    validate_memory_candidates,
    validate_retrieval_intent,
)
from llm import (
    LLMEmpty,
    LLMInvalid,
    LLMTimeout,
    compose_memory_augmented_answer,
    extract_memory_candidates,
    plan_retrieval_intent,
)
from memory_store import MemoryStore

USER_ID = 42
SESSION_1_USER_MESSAGE = (
    "For future incident updates, write in English, keep replies concise, "
    "use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."

INCIDENT_CONTEXT = {
    "date": "2026-03-04",
    "region": "US",
    "incident_id": "inc_payments_20260304",
    "severity": "P1",
    "gateway_status": "degraded",
    "failed_payment_rate": 0.034,
    "chargeback_alerts": 5,
    "eta_minutes": 45,
}

BUDGET = Budget(
    max_capture_items=6,
    max_retrieve_top_k=6,
    max_query_chars=240,
    max_answer_chars=700,
    max_value_chars=120,
    max_seconds=25,
)

ALLOWED_MEMORY_KEYS_POLICY = {
    "language",
    "response_style",
    "update_channel",
    "declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
    ALLOWED_MEMORY_KEYS_POLICY
    if TRUST_DECLARED_TIER_FROM_CHAT
    else {"language", "response_style", "update_channel"}
)

ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
    ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True



def _shorten(text: str, *, limit: int = 240) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _pick_applied_memory(
    memory_items: list[dict[str, Any]],
    used_keys: list[str],
) -> list[dict[str, Any]]:
    used = set(used_keys)
    out: list[dict[str, Any]] = []
    for item in memory_items:
        key = item.get("key")
        if key not in used:
            continue
        out.append(
            {
                "key": item["key"],
                "value": item["value"],
                "scope": item["scope"],
                "confidence": item["confidence"],
                "score": item["score"],
            }
        )
    return out



def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
    """
    Conservative audit check:
    - update_channel: value should appear in answer text.
    - response_style=concise: short-response format compliance.
    - language=english: answer should use a stable prefix "Incident update:".
    If no verifiable key exists, do not block.
    """
    if not applied_memory:
        return False

    normalized_answer = " ".join((answer or "").lower().split())
    evidenced_any = False
    has_verifiable_key = False

    def _is_concise(text: str) -> bool:
        words = re.findall(r"[a-zA-Z0-9_]+", text)
        sentence_count = len(re.findall(r"[.!?]+", text))
        return len(words) <= 80 and sentence_count <= 3

    for item in applied_memory:
        key = str(item.get("key") or "").strip().lower()
        value = str(item.get("value", "")).strip().lower()
        if not key or not value:
            continue

        if key == "update_channel":
            has_verifiable_key = True
            if value in normalized_answer:
                evidenced_any = True
        elif key == "response_style":
            has_verifiable_key = True
            if value == "concise" and _is_concise(answer):
                evidenced_any = True
        elif key == "language":
            if value in {"english", "en"}:
                has_verifiable_key = True
                if normalized_answer.startswith("incident update:"):
                    evidenced_any = True
            continue
        else:
            continue

    if not has_verifiable_key:
        return True
    return evidenced_any



def run_memory_augmented(
    *,
    user_id: int,
    session_1_message: str,
    session_2_goal: str,
) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    store = MemoryStore(max_items=100)
    gateway = MemoryGateway(
        store=store,
        budget=BUDGET,
        allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
        allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
    )

    try:
        raw_capture = extract_memory_candidates(
            user_message=session_1_message,
            available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="capture")

    try:
        capture_payload = validate_memory_candidates(
            raw_capture,
            allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_items=BUDGET.max_capture_items,
            max_value_chars=BUDGET.max_value_chars,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="capture",
            raw_capture=raw_capture,
        )

    write_result = gateway.write(
        user_id=user_id,
        items=capture_payload["items"],
        source="session_1",
    )

    refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
    written_items = [item for item in write_result["written"] if not item.get("refreshed")]

    trace.append(
        {
            "step": 1,
            "phase": "capture_store",
            "candidates": len(capture_payload["items"]),
            "written": len(written_items),
            "refreshed": len(refreshed_items),
            "blocked": len(write_result["blocked"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "session": "session_1",
            "message": session_1_message,
            "written_keys": [item["key"] for item in written_items],
            "refreshed_keys": [item["key"] for item in refreshed_items],
            "blocked": write_result["blocked"],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="retrieve_plan")

    try:
        raw_intent = plan_retrieval_intent(
            goal=session_2_goal,
            available_scopes=sorted(ALLOWED_SCOPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="retrieve_plan")

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_top_k=BUDGET.max_retrieve_top_k,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve_plan",
            raw_intent=raw_intent,
        )

    try:
        retrieval = gateway.retrieve(
            user_id=user_id,
            intent=intent,
            include_preference_keys=ENABLE_PREFERENCE_BIAS,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve",
            intent=intent,
        )

    trace.append(
        {
            "step": 2,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "memory_hits": len(retrieval["items"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "session": "session_2",
            "intent": intent,
            "resolved_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "retrieved_keys": [item["key"] for item in retrieval["items"]],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="apply")

    try:
        final = compose_memory_augmented_answer(
            goal=session_2_goal,
            incident_context=INCIDENT_CONTEXT,
            memory_items=retrieval["items"],
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="apply")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="apply")
    except LLMEmpty:
        return stopped("llm_empty", phase="apply")

    retrieved_keys = {item["key"] for item in retrieval["items"]}
    invalid_used_keys = sorted(
        set(final["used_memory_keys"]) - retrieved_keys,
    )
    if invalid_used_keys:
        return stopped(
            "invalid_answer:memory_keys_out_of_context",
            phase="apply",
            invalid_used_memory_keys=invalid_used_keys,
            retrieved_keys=sorted(retrieved_keys),
        )

    if len(final["answer"]) > BUDGET.max_answer_chars:
        return stopped("invalid_answer:too_long", phase="apply")

    applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
    if final["used_memory_keys"] and not _has_declared_memory_application(
        answer=final["answer"],
        applied_memory=applied_memory,
    ):
        return stopped(
            "invalid_answer:memory_declared_but_not_applied",
            phase="apply",
            used_memory_keys=final["used_memory_keys"],
            applied_memory=applied_memory,
        )

    trace.append(
        {
            "step": 3,
            "phase": "apply",
            "used_memory_keys": final["used_memory_keys"],
            "applied_memory_count": len(applied_memory),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 3,
            "action": "compose_memory_augmented_answer",
            "used_memory_keys": final["used_memory_keys"],
            "answer": _shorten(final["answer"]),
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
        "answer": final["answer"],
        "used_memory_keys": final["used_memory_keys"],
        "applied_memory": applied_memory,
        "memory_state": store.dump_user_records(user_id=user_id),
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_memory_augmented(
        user_id=USER_ID,
        session_1_message=SESSION_1_USER_MESSAGE,
        session_2_goal=SESSION_2_GOAL,
    )
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

What matters most here (in plain words)

  • Session 1 and Session 2 are simulated here within one process run via a shared memory store.
  • ALLOWED_MEMORY_KEYS_POLICY and ALLOWED_MEMORY_KEYS_EXECUTION can intentionally differ.
  • ENABLE_PREFERENCE_BIAS=True is enabled only for the incident-update flow, where preference fields are almost always needed.
  • Each result payload includes run_id for log correlation.
  • Final validation of used_memory_keys + memory_declared_but_not_applied makes the apply step audit-friendly.

Example output

JSON
{
  "run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "memory_applied",
  "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
  "used_memory_keys": [
    "language",
    "update_channel",
    "response_style"
  ],
  "applied_memory": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "confidence": 0.9,
      "score": 0.67
    }
  ],
  "memory_state": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.9,
      "ttl_left_days": 180.0
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "capture_store",
      "candidates": 4,
      "written": 3,
      "refreshed": 0,
      "blocked": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "retrieve",
      "query": "payment incident update and next actions",
      "requested_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "memory_hits": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "apply",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "applied_memory_count": 3,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "session": "session_1",
      "message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
      "written_keys": [
        "language",
        "response_style",
        "update_channel"
      ],
      "refreshed_keys": [],
      "blocked": [
        {
          "key": "declared_tier",
          "reason": "key_denied_execution"
        }
      ]
    },
    {
      "step": 2,
      "session": "session_2",
      "intent": {
        "kind": "retrieve_memory",
        "query": "payment incident update and next actions",
        "top_k": 4
      },
      "resolved_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "retrieved_keys": [
        "language",
        "update_channel",
        "response_style"
      ]
    },
    {
      "step": 3,
      "action": "compose_memory_augmented_answer",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
    }
  ]
}

Typical stop_reason values

  • success — run completed correctly; see outcome (memory_applied or context_only)
  • invalid_memory_candidates:* — memory capture failed contract validation
  • invalid_memory_candidates:value_too_long — value exceeds max_value_chars limit
  • memory_key_not_allowed_policy:<key> — LLM proposed a key outside policy allowlist
  • memory_scope_not_allowed_policy:<scope> — LLM proposed a scope outside policy allowlist
  • invalid_retrieval_intent:* — retrieval intent failed policy validation
  • scope_denied:<scope> — retrieval scope is not allowed by execution allowlist
  • llm_timeout — LLM did not respond within OPENAI_TIMEOUT_SECONDS
  • llm_invalid_json / llm_invalid_schema — apply step returned invalid JSON/shape
  • llm_empty — empty final answer
  • invalid_answer:memory_keys_out_of_context — model referenced a memory key not present in retrieval
  • invalid_answer:too_long — final answer exceeds max_answer_chars limit
  • invalid_answer:memory_declared_but_not_applied — model declared memory usage but answer text does not reflect it
  • max_seconds — total run time budget exceeded

What is NOT shown

  • persistent storage (Postgres/Redis/Vector DB) instead of in-memory implementation
  • encryption/PII redaction before writing memory
  • semantic retrieval via embeddings (instead of simple token overlap)
  • multi-tenant quotas and soft/hard retention policy
  • retry/backoff for LLM calls
  • per-key consent and user-visible memory UI

What to try next

  1. Enable TRUST_DECLARED_TIER_FROM_CHAT=True and check how blocked in history changes.
  2. Enable WORKSPACE_MEMORY_RUNTIME_ENABLED=True and add retrieval intent with workspace scope.
  3. Add a policy rule for memory key timezone and verify personalization in the answer.
  4. Replace the in-memory store with external storage and add deduplication by key + normalized_value.
⏱️ 20 min readUpdated Mar, 2026Difficulty: ★★☆
Integrated: production controlOnceOnly
Add guardrails to tool-calling agents
Ship this pattern with governance:
  • Budgets (steps / spend caps)
  • Tool permissions (allowlist / blocklist)
  • Kill switch & incident stop
  • Idempotency & dedupe
  • Audit logs & traceability
Integrated mention: OnceOnly is a control layer for production agent systems.
Author

This documentation is curated and maintained by engineers who ship AI agents in production.

The content is AI-assisted, with human editorial responsibility for accuracy, clarity, and production relevance.

Patterns and recommendations are grounded in post-mortems, failure modes, and operational incidents in deployed systems, including during the development and operation of governance infrastructure for agents at OnceOnly.