Суть патерна (коротко)
Memory-Augmented Agent — це патерн, у якому агент зберігає важливі факти між сесіями та використовує їх у наступних відповідях.
LLM пропонує, що запам'ятати і що витягнути, а memory policy/execution layer контролює, що реально можна записати/прочитати.
Що демонструє цей приклад
- дві фази (Session 1/2) в одному запуску; persistence між процесами не показано
- memory extraction через LLM у JSON-контракті (
items[]) - policy boundary для memory write/retrieve контрактів
- execution boundary (runtime allowlist) для ключів і scope пам'яті
- runtime trust-gating: execution може блокувати sensitive keys, навіть якщо policy їх дозволяє
- у цьому демо policy-allowlist працює як hard fail, щоб швидко виявляти drift і помилки контракту
- TTL memory lifecycle та bounded in-memory store
- фінальну відповідь з перевіркою
used_memory_keysпроти реально retrieved memory - явні
stop_reason,trace,historyдля продакшен-моніторингу
Архітектура
- LLM витягує memory candidates з повідомлення користувача (
items). - Policy boundary валідовує shape, ключі,
ttl_days,confidence. - Execution boundary вирішує, які записи реально писати (runtime allowlist).
- У наступній сесії LLM планує retrieval intent (
kind/query/top_k/scopes). - Gateway виконує retrieval тільки в runtime-дозволених scope.
- LLM формує відповідь на основі
incident_context + memory_items. - Система перевіряє, що
used_memory_keysпосилаються тільки на реально retrieved memory.
Ключовий контракт: LLM може запропонувати записи та retrieval intent, але policy/execution шар визначає, що вважається валідним і що реально виконується.
Policy allowlist визначає, що модель може попросити, execution allowlist визначає, що runtime реально дозволяє прямо зараз.
Структура проєкту
examples/
└── agent-patterns/
└── memory-augmented-agent/
└── python/
├── main.py # Session1 capture/store -> Session2 retrieve/apply
├── llm.py # extraction + retrieval planning + final response
├── gateway.py # policy/execution boundary for memory operations
├── memory_store.py # in-memory store with TTL and relevance scoring
├── requirements.txt
└── README.md
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.
Задача
Уяви operations-кейс для incident assistant:
Сесія 1: користувач задає стабільні preference (мова, стиль відповіді, канал оновлень).
Сесія 2: просить сформувати короткий update по платіжному інциденту.
У Session 2 goal навмисно включає і update, і next actions, щоб показати застосування preference-полів у структурованому апдейті.
Агент має:
- записати тільки корисні факти пам'яті
- не записувати ключі, які runtime забороняє
- витягнути релевантну пам'ять у наступній сесії
- застосувати її у фінальній відповіді
Рішення
У цьому прикладі:
- LLM робить
extract_memory_candidates(...)іplan_retrieval_intent(...) - gateway валідовує контракти і enforce-ить execution allowlist
MemoryStoreзберігає записи з TTL та повертає топ релевантних фактівENABLE_PREFERENCE_BIAS— runtime-перемикач для цього flow: це не "always include", а керований bias (preference keys отримують невеликий бонус у скорингу й можуть потрапити вtop_k)- за
ENABLE_PREFERENCE_BIAS=Truepreference keys можуть потрапити вtop_kнавіть без token-overlap (через керований score bonus) - фінальна відповідь проходить allowlist-check:
used_memory_keys⊆retrieved_keys - перевірка
response_style=conciseу цьому демо — це форматний комплаєнс (довжина/кількість речень), а не семантична оцінка тону - результат містить повний
traceі стислийhistory
Код
memory_store.py — TTL пам'ять і retrieval scoring
from __future__ import annotations
import re
import time
from dataclasses import dataclass
from typing import Any
DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}
def _tokenize(text: str) -> set[str]:
return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))
@dataclass
class MemoryRecord:
user_id: int
key: str
value: str
scope: str
source: str
confidence: float
updated_at: float
expires_at: float
class MemoryStore:
def __init__(self, *, max_items: int):
self.max_items = max_items
self._records: dict[tuple[int, str, str], MemoryRecord] = {}
def _evict_if_needed(self) -> None:
if len(self._records) <= self.max_items:
return
oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
self._records.pop(oldest_key, None)
def upsert_items(
self,
*,
user_id: int,
items: list[dict[str, Any]],
source: str,
) -> list[dict[str, Any]]:
now = time.time()
written: list[dict[str, Any]] = []
for item in items:
key = str(item["key"]).strip()
value = str(item["value"]).strip()
scope = str(item.get("scope", "user")).strip() or "user"
ttl_days = float(item.get("ttl_days", 180))
ttl_days = max(1.0, min(365.0, ttl_days))
confidence = float(item.get("confidence", 0.8))
confidence = max(0.0, min(1.0, confidence))
expires_at = now + ttl_days * 86400.0
record_key = (user_id, scope, key)
existing = self._records.get(record_key)
if existing and existing.value == value:
# Stable value: refresh metadata without creating noisy rewrites.
existing.source = source
existing.confidence = confidence
existing.updated_at = now
existing.expires_at = expires_at
written.append(
{
"key": key,
"value": value,
"scope": scope,
"source": source,
"confidence": round(confidence, 3),
"ttl_days": int(ttl_days),
"refreshed": True,
}
)
continue
row = MemoryRecord(
user_id=user_id,
key=key,
value=value,
scope=scope,
source=source,
confidence=confidence,
updated_at=now,
expires_at=expires_at,
)
self._records[record_key] = row
self._evict_if_needed()
written.append(
{
"key": key,
"value": value,
"scope": scope,
"source": source,
"confidence": round(confidence, 3),
"ttl_days": int(ttl_days),
"refreshed": False,
}
)
return written
def search(
self,
*,
user_id: int,
query: str,
top_k: int,
scopes: set[str],
include_preference_keys: bool = False,
) -> list[dict[str, Any]]:
now = time.time()
query_tokens = _tokenize(query)
if not query_tokens:
return []
hits: list[tuple[float, MemoryRecord]] = []
for row in list(self._records.values()):
if row.user_id != user_id:
continue
if row.scope not in scopes:
continue
if row.expires_at <= now:
self._records.pop((row.user_id, row.scope, row.key), None)
continue
text_tokens = _tokenize(f"{row.key} {row.value}")
overlap = len(query_tokens & text_tokens)
if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
continue
score = overlap + (row.confidence * 0.3)
if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
score += 0.4
if score <= 0:
continue
hits.append((score, row))
hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)
result: list[dict[str, Any]] = []
for score, row in hits[:top_k]:
result.append(
{
"key": row.key,
"value": row.value,
"scope": row.scope,
"source": row.source,
"confidence": round(row.confidence, 3),
"score": round(score, 3),
}
)
return result
def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
now = time.time()
rows: list[MemoryRecord] = []
for row in list(self._records.values()):
if row.user_id != user_id:
continue
if row.expires_at <= now:
self._records.pop((row.user_id, row.scope, row.key), None)
continue
rows.append(row)
rows.sort(key=lambda item: item.updated_at, reverse=True)
snapshot: list[dict[str, Any]] = []
for row in rows:
ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
snapshot.append(
{
"key": row.key,
"value": row.value,
"scope": row.scope,
"source": row.source,
"confidence": round(row.confidence, 3),
"ttl_left_days": round(ttl_left_days, 1),
}
)
return snapshot
Що тут найважливіше (простими словами)
- Пам'ять ізольована по
user_idіscope. - Є lifecycle: TTL + cleanup прострочених записів.
- Boost для preference keys керується runtime-policy (
include_preference_keys), а не wording retrieval query. search(...)повертає релевантні memory items, а не весь state.
gateway.py — policy/execution boundary для memory
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from memory_store import MemoryStore
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_capture_items: int = 6
max_retrieve_top_k: int = 6
max_query_chars: int = 240
max_answer_chars: int = 700
max_value_chars: int = 120
max_seconds: int = 25
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_memory_candidates(
raw: Any,
*,
allowed_keys_policy: set[str],
allowed_scopes_policy: set[str],
max_items: int,
max_value_chars: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_memory_candidates:not_object")
items = raw.get("items")
if not isinstance(items, list):
raise StopRun("invalid_memory_candidates:items")
normalized: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
raise StopRun("invalid_memory_candidates:item")
required_keys = {"key", "value"}
if not required_keys.issubset(item.keys()):
raise StopRun("invalid_memory_candidates:missing_keys")
key = item.get("key")
value = item.get("value")
scope = item.get("scope", "user")
ttl_days = item.get("ttl_days", 180)
confidence = item.get("confidence", 0.8)
if not isinstance(key, str) or not key.strip():
raise StopRun("invalid_memory_candidates:key")
key = key.strip()
if key not in allowed_keys_policy:
raise StopRun(f"memory_key_not_allowed_policy:{key}")
if not isinstance(value, str) or not value.strip():
raise StopRun("invalid_memory_candidates:value")
value = value.strip()
if len(value) > max_value_chars:
raise StopRun("invalid_memory_candidates:value_too_long")
if not isinstance(scope, str) or not scope.strip():
raise StopRun("invalid_memory_candidates:scope")
scope = scope.strip()
if scope not in allowed_scopes_policy:
raise StopRun(f"memory_scope_not_allowed_policy:{scope}")
if not _is_number(ttl_days):
raise StopRun("invalid_memory_candidates:ttl_days")
ttl_days = int(float(ttl_days))
ttl_days = max(1, min(365, ttl_days))
if not _is_number(confidence):
raise StopRun("invalid_memory_candidates:confidence")
confidence = float(confidence)
confidence = max(0.0, min(1.0, confidence))
normalized.append(
{
"key": key,
"value": value,
"scope": scope,
"ttl_days": ttl_days,
"confidence": round(confidence, 3),
}
)
if len(normalized) > max_items:
raise StopRun("invalid_memory_candidates:too_many_items")
return {"items": normalized}
def validate_retrieval_intent(
raw: Any,
*,
allowed_scopes_policy: set[str],
max_top_k: int,
) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_retrieval_intent:not_object")
if raw.get("kind") != "retrieve_memory":
raise StopRun("invalid_retrieval_intent:kind")
query = raw.get("query")
if not isinstance(query, str) or not query.strip():
raise StopRun("invalid_retrieval_intent:query")
top_k = raw.get("top_k", 4)
if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
raise StopRun("invalid_retrieval_intent:top_k")
scopes_raw = raw.get("scopes")
normalized_scopes: list[str] = []
if scopes_raw is not None:
if not isinstance(scopes_raw, list) or not scopes_raw:
raise StopRun("invalid_retrieval_intent:scopes")
for scope in scopes_raw:
if not isinstance(scope, str) or not scope.strip():
raise StopRun("invalid_retrieval_intent:scope_item")
normalized_scope = scope.strip()
if normalized_scope not in allowed_scopes_policy:
raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
normalized_scopes.append(normalized_scope)
payload = {
"kind": "retrieve_memory",
"query": query.strip(),
"top_k": top_k,
}
if normalized_scopes:
payload["scopes"] = normalized_scopes
return payload
class MemoryGateway:
def __init__(
self,
*,
store: MemoryStore,
budget: Budget,
allow_execution_keys: set[str],
allow_execution_scopes: set[str],
):
self.store = store
self.budget = budget
self.allow_execution_keys = set(allow_execution_keys)
self.allow_execution_scopes = set(allow_execution_scopes)
def write(
self,
*,
user_id: int,
items: list[dict[str, Any]],
source: str,
) -> dict[str, Any]:
if len(items) > self.budget.max_capture_items:
raise StopRun("max_capture_items")
writable: list[dict[str, Any]] = []
blocked: list[dict[str, Any]] = []
for item in items:
key = item["key"]
scope = item["scope"]
if key not in self.allow_execution_keys:
blocked.append({"key": key, "reason": "key_denied_execution"})
continue
if scope not in self.allow_execution_scopes:
blocked.append(
{
"key": key,
"scope": scope,
"reason": "scope_denied_execution",
}
)
continue
writable.append(item)
written = []
if writable:
written = self.store.upsert_items(user_id=user_id, items=writable, source=source)
return {
"written": written,
"blocked": blocked,
}
def retrieve(
self,
*,
user_id: int,
intent: dict[str, Any],
include_preference_keys: bool = False,
) -> dict[str, Any]:
query = intent["query"]
if len(query) > self.budget.max_query_chars:
raise StopRun("invalid_retrieval_intent:query_too_long")
requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
denied = sorted(requested_scopes - self.allow_execution_scopes)
if denied:
raise StopRun(f"scope_denied:{denied[0]}")
items = self.store.search(
user_id=user_id,
query=query,
top_k=intent["top_k"],
scopes=requested_scopes,
include_preference_keys=include_preference_keys,
)
return {
"query": query,
"requested_scopes": sorted(requested_scopes),
"include_preference_keys": include_preference_keys,
"items": items,
}
Що тут найважливіше (простими словами)
- Policy boundary перевіряє контракт і допустимі ключі/скоупи.
- Policy strict: memory
key/scopeпоза allowlist зупиняє run. - Gateway only enforce-ить execution allowlist, який приходить з
main.py. - Якщо key/scope runtime заборонені, запис блокується і це видно в
history.blocked.
llm.py — extract, retrieve-intent, apply
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
class LLMInvalid(Exception):
pass
MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
"items": [
{
"key": "language",
"value": "english",
"scope": "user",
"ttl_days": 180,
"confidence": 0.9
}
]
}
Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()
RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
"kind": "retrieve_memory",
"query": "short memory query",
"top_k": 4
}
Optional key:
- "scopes": ["user", "workspace"]
Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()
ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
"answer": "final answer in English",
"used_memory_keys": ["language", "response_style"]
}
Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def extract_memory_candidates(
*,
user_message: str,
available_keys: list[str],
) -> dict[str, Any]:
payload = {
"user_message": user_message,
"available_keys": available_keys,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
payload = {
"goal": goal,
"available_scopes": available_scopes,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_memory_augmented_answer(
*,
goal: str,
incident_context: dict[str, Any],
memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"goal": goal,
"incident_context": incident_context,
"memory_items": [
{
"key": item.get("key"),
"value": item.get("value"),
"scope": item.get("scope"),
"confidence": item.get("confidence"),
}
for item in memory_items
],
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": ANSWER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
data = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMInvalid("llm_invalid_json") from exc
if not isinstance(data, dict):
raise LLMInvalid("llm_invalid_json")
answer = data.get("answer")
used_memory_keys = data.get("used_memory_keys")
if not isinstance(answer, str):
raise LLMInvalid("llm_invalid_schema")
if not answer.strip():
raise LLMEmpty("llm_empty")
if not isinstance(used_memory_keys, list):
raise LLMInvalid("llm_invalid_schema")
normalized_keys: list[str] = []
for value in used_memory_keys:
if not isinstance(value, str):
raise LLMInvalid("llm_invalid_schema")
item = value.strip()
if item and item not in normalized_keys:
normalized_keys.append(item)
return {
"answer": answer.strip(),
"used_memory_keys": normalized_keys,
}
Що тут найважливіше (простими словами)
- Кожна фаза має окремий JSON-контракт:
capture,retrieve_intent,apply. - Помилки LLM відділені на
llm_timeout,llm_invalid_*,llm_empty.
main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
from __future__ import annotations
import json
import re
import time
import uuid
from typing import Any
from gateway import (
Budget,
MemoryGateway,
StopRun,
validate_memory_candidates,
validate_retrieval_intent,
)
from llm import (
LLMEmpty,
LLMInvalid,
LLMTimeout,
compose_memory_augmented_answer,
extract_memory_candidates,
plan_retrieval_intent,
)
from memory_store import MemoryStore
USER_ID = 42
SESSION_1_USER_MESSAGE = (
"For future incident updates, write in English, keep replies concise, "
"use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."
INCIDENT_CONTEXT = {
"date": "2026-03-04",
"region": "US",
"incident_id": "inc_payments_20260304",
"severity": "P1",
"gateway_status": "degraded",
"failed_payment_rate": 0.034,
"chargeback_alerts": 5,
"eta_minutes": 45,
}
BUDGET = Budget(
max_capture_items=6,
max_retrieve_top_k=6,
max_query_chars=240,
max_answer_chars=700,
max_value_chars=120,
max_seconds=25,
)
ALLOWED_MEMORY_KEYS_POLICY = {
"language",
"response_style",
"update_channel",
"declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
ALLOWED_MEMORY_KEYS_POLICY
if TRUST_DECLARED_TIER_FROM_CHAT
else {"language", "response_style", "update_channel"}
)
ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True
def _shorten(text: str, *, limit: int = 240) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[: limit - 3].rstrip() + "..."
def _pick_applied_memory(
memory_items: list[dict[str, Any]],
used_keys: list[str],
) -> list[dict[str, Any]]:
used = set(used_keys)
out: list[dict[str, Any]] = []
for item in memory_items:
key = item.get("key")
if key not in used:
continue
out.append(
{
"key": item["key"],
"value": item["value"],
"scope": item["scope"],
"confidence": item["confidence"],
"score": item["score"],
}
)
return out
def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
"""
Conservative audit check:
- update_channel: value should appear in answer text.
- response_style=concise: short-response format compliance.
- language=english: answer should use a stable prefix "Incident update:".
If no verifiable key exists, do not block.
"""
if not applied_memory:
return False
normalized_answer = " ".join((answer or "").lower().split())
evidenced_any = False
has_verifiable_key = False
def _is_concise(text: str) -> bool:
words = re.findall(r"[a-zA-Z0-9_]+", text)
sentence_count = len(re.findall(r"[.!?]+", text))
return len(words) <= 80 and sentence_count <= 3
for item in applied_memory:
key = str(item.get("key") or "").strip().lower()
value = str(item.get("value", "")).strip().lower()
if not key or not value:
continue
if key == "update_channel":
has_verifiable_key = True
if value in normalized_answer:
evidenced_any = True
elif key == "response_style":
has_verifiable_key = True
if value == "concise" and _is_concise(answer):
evidenced_any = True
elif key == "language":
if value in {"english", "en"}:
has_verifiable_key = True
if normalized_answer.startswith("incident update:"):
evidenced_any = True
continue
else:
continue
if not has_verifiable_key:
return True
return evidenced_any
def run_memory_augmented(
*,
user_id: int,
session_1_message: str,
session_2_goal: str,
) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
store = MemoryStore(max_items=100)
gateway = MemoryGateway(
store=store,
budget=BUDGET,
allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
)
try:
raw_capture = extract_memory_candidates(
user_message=session_1_message,
available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="capture")
try:
capture_payload = validate_memory_candidates(
raw_capture,
allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
max_items=BUDGET.max_capture_items,
max_value_chars=BUDGET.max_value_chars,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="capture",
raw_capture=raw_capture,
)
write_result = gateway.write(
user_id=user_id,
items=capture_payload["items"],
source="session_1",
)
refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
written_items = [item for item in write_result["written"] if not item.get("refreshed")]
trace.append(
{
"step": 1,
"phase": "capture_store",
"candidates": len(capture_payload["items"]),
"written": len(written_items),
"refreshed": len(refreshed_items),
"blocked": len(write_result["blocked"]),
"ok": True,
}
)
history.append(
{
"step": 1,
"session": "session_1",
"message": session_1_message,
"written_keys": [item["key"] for item in written_items],
"refreshed_keys": [item["key"] for item in refreshed_items],
"blocked": write_result["blocked"],
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="retrieve_plan")
try:
raw_intent = plan_retrieval_intent(
goal=session_2_goal,
available_scopes=sorted(ALLOWED_SCOPES_POLICY),
)
except LLMTimeout:
return stopped("llm_timeout", phase="retrieve_plan")
try:
intent = validate_retrieval_intent(
raw_intent,
allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
max_top_k=BUDGET.max_retrieve_top_k,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="retrieve_plan",
raw_intent=raw_intent,
)
try:
retrieval = gateway.retrieve(
user_id=user_id,
intent=intent,
include_preference_keys=ENABLE_PREFERENCE_BIAS,
)
except StopRun as exc:
return stopped(
exc.reason,
phase="retrieve",
intent=intent,
)
trace.append(
{
"step": 2,
"phase": "retrieve",
"query": retrieval["query"],
"requested_scopes": retrieval["requested_scopes"],
"include_preference_keys": retrieval["include_preference_keys"],
"memory_hits": len(retrieval["items"]),
"ok": True,
}
)
history.append(
{
"step": 2,
"session": "session_2",
"intent": intent,
"resolved_scopes": retrieval["requested_scopes"],
"include_preference_keys": retrieval["include_preference_keys"],
"retrieved_keys": [item["key"] for item in retrieval["items"]],
}
)
if (time.monotonic() - started) > BUDGET.max_seconds:
return stopped("max_seconds", phase="apply")
try:
final = compose_memory_augmented_answer(
goal=session_2_goal,
incident_context=INCIDENT_CONTEXT,
memory_items=retrieval["items"],
)
except LLMTimeout:
return stopped("llm_timeout", phase="apply")
except LLMInvalid as exc:
return stopped(exc.args[0], phase="apply")
except LLMEmpty:
return stopped("llm_empty", phase="apply")
retrieved_keys = {item["key"] for item in retrieval["items"]}
invalid_used_keys = sorted(
set(final["used_memory_keys"]) - retrieved_keys,
)
if invalid_used_keys:
return stopped(
"invalid_answer:memory_keys_out_of_context",
phase="apply",
invalid_used_memory_keys=invalid_used_keys,
retrieved_keys=sorted(retrieved_keys),
)
if len(final["answer"]) > BUDGET.max_answer_chars:
return stopped("invalid_answer:too_long", phase="apply")
applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
if final["used_memory_keys"] and not _has_declared_memory_application(
answer=final["answer"],
applied_memory=applied_memory,
):
return stopped(
"invalid_answer:memory_declared_but_not_applied",
phase="apply",
used_memory_keys=final["used_memory_keys"],
applied_memory=applied_memory,
)
trace.append(
{
"step": 3,
"phase": "apply",
"used_memory_keys": final["used_memory_keys"],
"applied_memory_count": len(applied_memory),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "compose_memory_augmented_answer",
"used_memory_keys": final["used_memory_keys"],
"answer": _shorten(final["answer"]),
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
"answer": final["answer"],
"used_memory_keys": final["used_memory_keys"],
"applied_memory": applied_memory,
"memory_state": store.dump_user_records(user_id=user_id),
"trace": trace,
"history": history,
}
def main() -> None:
result = run_memory_augmented(
user_id=USER_ID,
session_1_message=SESSION_1_USER_MESSAGE,
session_2_goal=SESSION_2_GOAL,
)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
- Session 1 і Session 2 тут симулюються в межах одного process-run через спільний memory store.
ALLOWED_MEMORY_KEYS_POLICYіALLOWED_MEMORY_KEYS_EXECUTIONспеціально можуть відрізнятись.ENABLE_PREFERENCE_BIAS=Trueвмикається лише для incident-update flow, де preference-поля майже завжди потрібні.- Кожен result payload має
run_idдля кореляції логів. - Фінальна перевірка
used_memory_keys+memory_declared_but_not_appliedробить apply-крок audit-friendly.
Приклад виводу
{
"run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
"status": "ok",
"stop_reason": "success",
"outcome": "memory_applied",
"answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"applied_memory": [
{
"key": "language",
"value": "english",
"scope": "user",
"confidence": 0.95,
"score": 0.685
},
{
"key": "update_channel",
"value": "email",
"scope": "user",
"confidence": 0.95,
"score": 0.685
},
{
"key": "response_style",
"value": "concise",
"scope": "user",
"confidence": 0.9,
"score": 0.67
}
],
"memory_state": [
{
"key": "language",
"value": "english",
"scope": "user",
"source": "session_1",
"confidence": 0.95,
"ttl_left_days": 180.0
},
{
"key": "response_style",
"value": "concise",
"scope": "user",
"source": "session_1",
"confidence": 0.9,
"ttl_left_days": 180.0
},
{
"key": "update_channel",
"value": "email",
"scope": "user",
"source": "session_1",
"confidence": 0.95,
"ttl_left_days": 180.0
}
],
"trace": [
{
"step": 1,
"phase": "capture_store",
"candidates": 4,
"written": 3,
"refreshed": 0,
"blocked": 1,
"ok": true
},
{
"step": 2,
"phase": "retrieve",
"query": "payment incident update and next actions",
"requested_scopes": [
"user"
],
"include_preference_keys": true,
"memory_hits": 3,
"ok": true
},
{
"step": 3,
"phase": "apply",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"applied_memory_count": 3,
"ok": true
}
],
"history": [
{
"step": 1,
"session": "session_1",
"message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
"written_keys": [
"language",
"response_style",
"update_channel"
],
"refreshed_keys": [],
"blocked": [
{
"key": "declared_tier",
"reason": "key_denied_execution"
}
]
},
{
"step": 2,
"session": "session_2",
"intent": {
"kind": "retrieve_memory",
"query": "payment incident update and next actions",
"top_k": 4
},
"resolved_scopes": [
"user"
],
"include_preference_keys": true,
"retrieved_keys": [
"language",
"update_channel",
"response_style"
]
},
{
"step": 3,
"action": "compose_memory_augmented_answer",
"used_memory_keys": [
"language",
"update_channel",
"response_style"
],
"answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
}
]
}
Типові stop_reason
success— run завершено коректно; дивисьoutcome(memory_appliedабоcontext_only)invalid_memory_candidates:*— memory capture не пройшов контракт валідаціїinvalid_memory_candidates:value_too_long— value перевищує лімітmax_value_charsmemory_key_not_allowed_policy:<key>— LLM запропонував key поза policy allowlistmemory_scope_not_allowed_policy:<scope>— LLM запропонував scope поза policy allowlistinvalid_retrieval_intent:*— retrieval intent не пройшов policy validationscope_denied:<scope>— retrieval scope не дозволений execution allowlist-омllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_invalid_json/llm_invalid_schema— apply-крок повернув невалідний JSON/shapellm_empty— порожня фінальна відповідьinvalid_answer:memory_keys_out_of_context— модель послалась на memory key, якого не було в retrievalinvalid_answer:too_long— фінальна відповідь перевищує лімітmax_answer_charsinvalid_answer:memory_declared_but_not_applied— модель заявила використання пам'яті, але текст відповіді це не відображаєmax_seconds— перевищено загальний time budget run
Що НЕ показано
- персистентне сховище (Postgres/Redis/Vector DB) замість in-memory реалізації
- шифрування/PII redaction перед записом пам'яті
- semantic retrieval через embeddings (замість простого token-overlap)
- multi-tenant quotas і soft/hard retention policy
- retry/backoff для LLM викликів
- per-key consent і user-visible memory UI
Що спробувати далі
- Увімкни
TRUST_DECLARED_TIER_FROM_CHAT=Trueі перевір, як змінитьсяblockedуhistory. - Увімкни
WORKSPACE_MEMORY_RUNTIME_ENABLED=Trueі додай retrieval intent зі scopeworkspace. - Додай policy-rule для memory key
timezoneі перевір персоналізацію в answer. - Замініть in-memory store на зовнішнє сховище і додайте дедуплікацію по
key + normalized_value.