Memory-Augmented Agent — Python (vollstandige Implementierung mit LLM)

Ausfuhrbares Memory-Augmented-Agent-Beispiel in Python im Production-Stil mit capture/store/retrieve/apply-Flow, Policy-vs-Execution-Allowlist, TTL-Memory-Lifecycle und expliziten Stop-Reasons.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was dieses Beispiel zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausfuehren
  6. Aufgabe
  7. Loesung
  8. Code
  9. memory_store.py — TTL-Memory und Retrieval-Scoring
  10. gateway.py — Policy/Execution Boundary fuer Memory
  11. llm.py — extract, retrieve-intent, apply
  12. main.py — Session1 Capture/Store -> Session2 Retrieve/Apply
  13. Beispielausgabe
  14. Typische stop_reason-Werte
  15. Was NICHT gezeigt wird
  16. Was du als Naechstes ausprobieren kannst

Kern des Musters (Kurz)

Memory-Augmented Agent ist ein Muster, bei dem der Agent wichtige Fakten uber Sitzungen hinweg speichert und in spaeteren Antworten verwendet.

Das LLM schlaegt vor, was gespeichert und was abgerufen wird, und die Memory-Policy/Execution-Layer kontrolliert, was tatsaechlich geschrieben/gelesen werden darf.


Was dieses Beispiel zeigt

  • zwei Phasen (Session 1/2) in einem Run; Persistenz zwischen Prozessen wird nicht gezeigt
  • Memory-Extraktion uber LLM in einem JSON-Vertrag (items[])
  • Policy Boundary fuer Memory-Write/Retrieve-Vertraege
  • Execution Boundary (Runtime-Allowlist) fuer Memory-Keys und Scopes
  • Runtime-Trust-Gating: Execution kann sensitive Keys blockieren, auch wenn Policy sie erlaubt
  • in dieser Demo arbeitet die Policy-Allowlist als Hard-Fail, um Drift und Vertragsfehler schnell zu erkennen
  • TTL-Memory-Lifecycle und begrenzter In-Memory-Store
  • finale Antwort mit Pruefung von used_memory_keys gegen tatsaechlich abgerufene Memory
  • explizite stop_reason, trace, history fuer Production-Monitoring

Architektur

  1. LLM extrahiert Memory-Kandidaten aus der Nutzernachricht (items).
  2. Policy Boundary validiert Shape, Keys, ttl_days und confidence.
  3. Execution Boundary entscheidet, welche Eintraege tatsaechlich geschrieben werden (Runtime-Allowlist).
  4. In der naechsten Sitzung plant LLM den Retrieval-Intent (kind/query/top_k/scopes).
  5. Gateway fuehrt Retrieval nur in Runtime-erlaubten Scopes aus.
  6. LLM bildet die Antwort auf Basis von incident_context + memory_items.
  7. Das System prueft, dass used_memory_keys nur auf tatsaechlich abgerufene Memory verweisen.

Schluesselvertrag: LLM kann Eintraege und Retrieval-Intent vorschlagen, aber die Policy/Execution-Schicht definiert, was als valide gilt und was tatsaechlich ausgefuehrt wird.

Policy-Allowlist definiert, was das Modell anfragen darf, Execution-Allowlist definiert, was Runtime jetzt tatsaechlich erlaubt.


Projektstruktur

TEXT
examples/
└── agent-patterns/
    └── memory-augmented-agent/
        └── python/
            ├── main.py           # Session1 capture/store -> Session2 retrieve/apply
            ├── llm.py            # extraction + retrieval planning + final response
            ├── gateway.py        # policy/execution boundary for memory operations
            ├── memory_store.py   # in-memory store with TTL and relevance scoring
            ├── requirements.txt
            └── README.md

Ausfuehren

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/memory-augmented-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ ist erforderlich.

Variante ueber export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante ueber .env (optional)
BASH
cat > .env <<'ENVFILE'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
ENVFILE

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.


Aufgabe

Stell dir einen Operations-Fall fuer einen Incident Assistant vor:

Session 1: Der Nutzer setzt stabile Preferences (Sprache, Antwortstil, Update-Kanal).
Session 2: bittet um ein kurzes Update zu einem Zahlungsincident.

In Session 2 enthaelt das Goal bewusst sowohl update als auch next actions, um die Anwendung von Preference-Feldern in einem strukturierten Update zu zeigen.

Der Agent muss:

  • nur nuetzliche Memory-Fakten speichern
  • keine Keys speichern, die Runtime verbietet
  • relevante Memory in der naechsten Sitzung abrufen
  • sie in der finalen Antwort anwenden

Loesung

In diesem Beispiel:

  • LLM macht extract_memory_candidates(...) und plan_retrieval_intent(...)
  • Gateway validiert Vertraege und enforced die Execution-Allowlist
  • MemoryStore speichert Eintraege mit TTL und liefert die relevantesten Fakten zurueck
  • ENABLE_PREFERENCE_BIAS ist ein Runtime-Schalter fuer diesen Flow: kein "always include", sondern kontrollierter Bias (Preference-Keys erhalten einen kleinen Scoring-Bonus und koennen in top_k gelangen)
  • bei ENABLE_PREFERENCE_BIAS=True koennen Preference-Keys auch ohne Token-Overlap in top_k gelangen (durch kontrollierten Score-Bonus)
  • finale Antwort besteht den Allowlist-Check: used_memory_keysretrieved_keys
  • Pruefung von response_style=concise ist in dieser Demo Format-Compliance (Laenge/Satzanzahl), keine semantische Tonbewertung
  • Ergebnis enthaelt vollstaendiges trace und kompaktes history

Code

memory_store.py — TTL-Memory und Retrieval-Scoring

PYTHON
from __future__ import annotations

import re
import time
from dataclasses import dataclass
from typing import Any


DEFAULT_BOOST_KEYS = {"language", "response_style", "update_channel"}


def _tokenize(text: str) -> set[str]:
    return set(re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()))


@dataclass
class MemoryRecord:
    user_id: int
    key: str
    value: str
    scope: str
    source: str
    confidence: float
    updated_at: float
    expires_at: float


class MemoryStore:
    def __init__(self, *, max_items: int):
        self.max_items = max_items
        self._records: dict[tuple[int, str, str], MemoryRecord] = {}

    def _evict_if_needed(self) -> None:
        if len(self._records) <= self.max_items:
            return
        oldest_key = min(self._records.items(), key=lambda item: item[1].updated_at)[0]
        self._records.pop(oldest_key, None)

    def upsert_items(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> list[dict[str, Any]]:
        now = time.time()
        written: list[dict[str, Any]] = []

        for item in items:
            key = str(item["key"]).strip()
            value = str(item["value"]).strip()
            scope = str(item.get("scope", "user")).strip() or "user"
            ttl_days = float(item.get("ttl_days", 180))
            ttl_days = max(1.0, min(365.0, ttl_days))
            confidence = float(item.get("confidence", 0.8))
            confidence = max(0.0, min(1.0, confidence))
            expires_at = now + ttl_days * 86400.0

            record_key = (user_id, scope, key)
            existing = self._records.get(record_key)
            if existing and existing.value == value:
                # Stable value: refresh metadata without creating noisy rewrites.
                existing.source = source
                existing.confidence = confidence
                existing.updated_at = now
                existing.expires_at = expires_at

                written.append(
                    {
                        "key": key,
                        "value": value,
                        "scope": scope,
                        "source": source,
                        "confidence": round(confidence, 3),
                        "ttl_days": int(ttl_days),
                        "refreshed": True,
                    }
                )
                continue

            row = MemoryRecord(
                user_id=user_id,
                key=key,
                value=value,
                scope=scope,
                source=source,
                confidence=confidence,
                updated_at=now,
                expires_at=expires_at,
            )
            self._records[record_key] = row
            self._evict_if_needed()

            written.append(
                {
                    "key": key,
                    "value": value,
                    "scope": scope,
                    "source": source,
                    "confidence": round(confidence, 3),
                    "ttl_days": int(ttl_days),
                    "refreshed": False,
                }
            )

        return written

    def search(
        self,
        *,
        user_id: int,
        query: str,
        top_k: int,
        scopes: set[str],
        include_preference_keys: bool = False,
    ) -> list[dict[str, Any]]:
        now = time.time()
        query_tokens = _tokenize(query)
        if not query_tokens:
            return []

        hits: list[tuple[float, MemoryRecord]] = []
        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.scope not in scopes:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue

            text_tokens = _tokenize(f"{row.key} {row.value}")
            overlap = len(query_tokens & text_tokens)
            if overlap == 0 and not (include_preference_keys and row.key in DEFAULT_BOOST_KEYS):
                continue

            score = overlap + (row.confidence * 0.3)

            if include_preference_keys and row.key in DEFAULT_BOOST_KEYS:
                score += 0.4

            if score <= 0:
                continue
            hits.append((score, row))

        hits.sort(key=lambda item: (item[0], item[1].updated_at), reverse=True)

        result: list[dict[str, Any]] = []
        for score, row in hits[:top_k]:
            result.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "score": round(score, 3),
                }
            )
        return result

    def dump_user_records(self, *, user_id: int) -> list[dict[str, Any]]:
        now = time.time()
        rows: list[MemoryRecord] = []

        for row in list(self._records.values()):
            if row.user_id != user_id:
                continue
            if row.expires_at <= now:
                self._records.pop((row.user_id, row.scope, row.key), None)
                continue
            rows.append(row)

        rows.sort(key=lambda item: item.updated_at, reverse=True)

        snapshot: list[dict[str, Any]] = []
        for row in rows:
            ttl_left_days = max(0.0, (row.expires_at - now) / 86400.0)
            snapshot.append(
                {
                    "key": row.key,
                    "value": row.value,
                    "scope": row.scope,
                    "source": row.source,
                    "confidence": round(row.confidence, 3),
                    "ttl_left_days": round(ttl_left_days, 1),
                }
            )
        return snapshot

Was hier am wichtigsten ist (einfach erklaert)

  • Memory ist nach user_id und scope isoliert.
  • Es gibt Lifecycle: TTL + Cleanup abgelaufener Eintraege.
  • Boost fuer Preference-Keys wird ueber Runtime-Policy (include_preference_keys) gesteuert, nicht ueber das Wording der Retrieval-Query.
  • search(...) liefert relevante Memory-Items, nicht den gesamten State.

gateway.py — Policy/Execution Boundary fuer Memory

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from memory_store import MemoryStore


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_capture_items: int = 6
    max_retrieve_top_k: int = 6
    max_query_chars: int = 240
    max_answer_chars: int = 700
    max_value_chars: int = 120
    max_seconds: int = 25


def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)


def validate_memory_candidates(
    raw: Any,
    *,
    allowed_keys_policy: set[str],
    allowed_scopes_policy: set[str],
    max_items: int,
    max_value_chars: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_memory_candidates:not_object")

    items = raw.get("items")
    if not isinstance(items, list):
        raise StopRun("invalid_memory_candidates:items")

    normalized: list[dict[str, Any]] = []
    for item in items:
        if not isinstance(item, dict):
            raise StopRun("invalid_memory_candidates:item")

        required_keys = {"key", "value"}
        if not required_keys.issubset(item.keys()):
            raise StopRun("invalid_memory_candidates:missing_keys")

        key = item.get("key")
        value = item.get("value")
        scope = item.get("scope", "user")
        ttl_days = item.get("ttl_days", 180)
        confidence = item.get("confidence", 0.8)

        if not isinstance(key, str) or not key.strip():
            raise StopRun("invalid_memory_candidates:key")
        key = key.strip()
        if key not in allowed_keys_policy:
            raise StopRun(f"memory_key_not_allowed_policy:{key}")

        if not isinstance(value, str) or not value.strip():
            raise StopRun("invalid_memory_candidates:value")
        value = value.strip()
        if len(value) > max_value_chars:
            raise StopRun("invalid_memory_candidates:value_too_long")

        if not isinstance(scope, str) or not scope.strip():
            raise StopRun("invalid_memory_candidates:scope")
        scope = scope.strip()
        if scope not in allowed_scopes_policy:
            raise StopRun(f"memory_scope_not_allowed_policy:{scope}")

        if not _is_number(ttl_days):
            raise StopRun("invalid_memory_candidates:ttl_days")
        ttl_days = int(float(ttl_days))
        ttl_days = max(1, min(365, ttl_days))

        if not _is_number(confidence):
            raise StopRun("invalid_memory_candidates:confidence")
        confidence = float(confidence)
        confidence = max(0.0, min(1.0, confidence))

        normalized.append(
            {
                "key": key,
                "value": value,
                "scope": scope,
                "ttl_days": ttl_days,
                "confidence": round(confidence, 3),
            }
        )

    if len(normalized) > max_items:
        raise StopRun("invalid_memory_candidates:too_many_items")

    return {"items": normalized}


def validate_retrieval_intent(
    raw: Any,
    *,
    allowed_scopes_policy: set[str],
    max_top_k: int,
) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_retrieval_intent:not_object")

    if raw.get("kind") != "retrieve_memory":
        raise StopRun("invalid_retrieval_intent:kind")

    query = raw.get("query")
    if not isinstance(query, str) or not query.strip():
        raise StopRun("invalid_retrieval_intent:query")

    top_k = raw.get("top_k", 4)
    if not isinstance(top_k, int) or not (1 <= top_k <= max_top_k):
        raise StopRun("invalid_retrieval_intent:top_k")

    scopes_raw = raw.get("scopes")
    normalized_scopes: list[str] = []
    if scopes_raw is not None:
        if not isinstance(scopes_raw, list) or not scopes_raw:
            raise StopRun("invalid_retrieval_intent:scopes")
        for scope in scopes_raw:
            if not isinstance(scope, str) or not scope.strip():
                raise StopRun("invalid_retrieval_intent:scope_item")
            normalized_scope = scope.strip()
            if normalized_scope not in allowed_scopes_policy:
                raise StopRun(f"invalid_retrieval_intent:scope_not_allowed:{normalized_scope}")
            normalized_scopes.append(normalized_scope)

    payload = {
        "kind": "retrieve_memory",
        "query": query.strip(),
        "top_k": top_k,
    }
    if normalized_scopes:
        payload["scopes"] = normalized_scopes
    return payload


class MemoryGateway:
    def __init__(
        self,
        *,
        store: MemoryStore,
        budget: Budget,
        allow_execution_keys: set[str],
        allow_execution_scopes: set[str],
    ):
        self.store = store
        self.budget = budget
        self.allow_execution_keys = set(allow_execution_keys)
        self.allow_execution_scopes = set(allow_execution_scopes)

    def write(
        self,
        *,
        user_id: int,
        items: list[dict[str, Any]],
        source: str,
    ) -> dict[str, Any]:
        if len(items) > self.budget.max_capture_items:
            raise StopRun("max_capture_items")

        writable: list[dict[str, Any]] = []
        blocked: list[dict[str, Any]] = []

        for item in items:
            key = item["key"]
            scope = item["scope"]

            if key not in self.allow_execution_keys:
                blocked.append({"key": key, "reason": "key_denied_execution"})
                continue
            if scope not in self.allow_execution_scopes:
                blocked.append(
                    {
                        "key": key,
                        "scope": scope,
                        "reason": "scope_denied_execution",
                    }
                )
                continue

            writable.append(item)

        written = []
        if writable:
            written = self.store.upsert_items(user_id=user_id, items=writable, source=source)

        return {
            "written": written,
            "blocked": blocked,
        }

    def retrieve(
        self,
        *,
        user_id: int,
        intent: dict[str, Any],
        include_preference_keys: bool = False,
    ) -> dict[str, Any]:
        query = intent["query"]
        if len(query) > self.budget.max_query_chars:
            raise StopRun("invalid_retrieval_intent:query_too_long")

        requested_scopes = set(intent.get("scopes") or self.allow_execution_scopes)
        denied = sorted(requested_scopes - self.allow_execution_scopes)
        if denied:
            raise StopRun(f"scope_denied:{denied[0]}")

        items = self.store.search(
            user_id=user_id,
            query=query,
            top_k=intent["top_k"],
            scopes=requested_scopes,
            include_preference_keys=include_preference_keys,
        )

        return {
            "query": query,
            "requested_scopes": sorted(requested_scopes),
            "include_preference_keys": include_preference_keys,
            "items": items,
        }

Was hier am wichtigsten ist (einfach erklaert)

  • Policy Boundary prueft Vertrag und erlaubte Keys/Scopes.
  • Policy ist strikt: Memory key/scope ausserhalb der Allowlist stoppt den Run.
  • Gateway enforced nur die Execution-Allowlist, die aus main.py kommt.
  • Wenn Key/Scope zur Runtime verboten sind, wird Write blockiert und ist in history.blocked sichtbar.

llm.py — extract, retrieve-intent, apply

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


class LLMInvalid(Exception):
    pass


MEMORY_CAPTURE_SYSTEM_PROMPT = """
You are a memory extraction assistant.
Return exactly one JSON object in this shape:
{
  "items": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "ttl_days": 180,
      "confidence": 0.9
    }
  ]
}

Rules:
- Extract only stable preferences or durable constraints useful in future sessions.
- Use only keys from available_keys.
- scope must be "user" or "workspace".
- ttl_days must be between 1 and 365.
- confidence must be between 0 and 1.
- If nothing should be stored, return {"items": []}.
- Do not output markdown or extra keys.
""".strip()

RETRIEVAL_INTENT_SYSTEM_PROMPT = """
You are a memory retrieval planner.
Return exactly one JSON object in this shape:
{
  "kind": "retrieve_memory",
  "query": "short memory query",
  "top_k": 4
}

Optional key:
- "scopes": ["user", "workspace"]

Rules:
- Keep query compact and factual.
- top_k must be between 1 and 6.
- Use only scopes from available_scopes.
- Prefer omitting "scopes" unless the goal explicitly requires a specific scope.
- Do not output markdown or extra keys.
""".strip()

ANSWER_SYSTEM_PROMPT = """
You are an incident response assistant.
Return exactly one JSON object in this shape:
{
  "answer": "final answer in English",
  "used_memory_keys": ["language", "response_style"]
}

Rules:
- Use only incident_context and memory_items.
- Keep the answer concise, actionable, and suitable for an operations update.
- used_memory_keys must reference only keys present in memory_items.
- If "update_channel" is used, explicitly mention it in answer text (for example, "via email").
- If "language" is used with value "english", start answer with "Incident update:".
- If no memory was used, return an empty array.
- Do not output markdown or extra keys.
""".strip()


def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)


def extract_memory_candidates(
    *,
    user_message: str,
    available_keys: list[str],
) -> dict[str, Any]:
    payload = {
        "user_message": user_message,
        "available_keys": available_keys,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": MEMORY_CAPTURE_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}


def plan_retrieval_intent(*, goal: str, available_scopes: list[str]) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "available_scopes": available_scopes,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": RETRIEVAL_INTENT_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"kind": "invalid", "raw": text}


def compose_memory_augmented_answer(
    *,
    goal: str,
    incident_context: dict[str, Any],
    memory_items: list[dict[str, Any]],
) -> dict[str, Any]:
    payload = {
        "goal": goal,
        "incident_context": incident_context,
        "memory_items": [
            {
                "key": item.get("key"),
                "value": item.get("value"),
                "scope": item.get("scope"),
                "confidence": item.get("confidence"),
            }
            for item in memory_items
        ],
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        data = json.loads(text)
    except json.JSONDecodeError as exc:
        raise LLMInvalid("llm_invalid_json") from exc

    if not isinstance(data, dict):
        raise LLMInvalid("llm_invalid_json")

    answer = data.get("answer")
    used_memory_keys = data.get("used_memory_keys")

    if not isinstance(answer, str):
        raise LLMInvalid("llm_invalid_schema")
    if not answer.strip():
        raise LLMEmpty("llm_empty")

    if not isinstance(used_memory_keys, list):
        raise LLMInvalid("llm_invalid_schema")

    normalized_keys: list[str] = []
    for value in used_memory_keys:
        if not isinstance(value, str):
            raise LLMInvalid("llm_invalid_schema")
        item = value.strip()
        if item and item not in normalized_keys:
            normalized_keys.append(item)

    return {
        "answer": answer.strip(),
        "used_memory_keys": normalized_keys,
    }

Was hier am wichtigsten ist (einfach erklaert)

  • Jede Phase hat einen separaten JSON-Vertrag: capture, retrieve_intent, apply.
  • LLM-Fehler sind getrennt in llm_timeout, llm_invalid_*, llm_empty.

main.py — Session1 Capture/Store -> Session2 Retrieve/Apply

PYTHON
from __future__ import annotations

import json
import re
import time
import uuid
from typing import Any

from gateway import (
    Budget,
    MemoryGateway,
    StopRun,
    validate_memory_candidates,
    validate_retrieval_intent,
)
from llm import (
    LLMEmpty,
    LLMInvalid,
    LLMTimeout,
    compose_memory_augmented_answer,
    extract_memory_candidates,
    plan_retrieval_intent,
)
from memory_store import MemoryStore

USER_ID = 42
SESSION_1_USER_MESSAGE = (
    "For future incident updates, write in English, keep replies concise, "
    "use email as the primary channel, and remember that I am enterprise tier."
)
SESSION_2_GOAL = "Draft today's payment incident update and next actions."

INCIDENT_CONTEXT = {
    "date": "2026-03-04",
    "region": "US",
    "incident_id": "inc_payments_20260304",
    "severity": "P1",
    "gateway_status": "degraded",
    "failed_payment_rate": 0.034,
    "chargeback_alerts": 5,
    "eta_minutes": 45,
}

BUDGET = Budget(
    max_capture_items=6,
    max_retrieve_top_k=6,
    max_query_chars=240,
    max_answer_chars=700,
    max_value_chars=120,
    max_seconds=25,
)

ALLOWED_MEMORY_KEYS_POLICY = {
    "language",
    "response_style",
    "update_channel",
    "declared_tier",
}
# Runtime can block high-risk keys even if policy allows them.
TRUST_DECLARED_TIER_FROM_CHAT = False
ALLOWED_MEMORY_KEYS_EXECUTION = (
    ALLOWED_MEMORY_KEYS_POLICY
    if TRUST_DECLARED_TIER_FROM_CHAT
    else {"language", "response_style", "update_channel"}
)

ALLOWED_SCOPES_POLICY = {"user", "workspace"}
WORKSPACE_MEMORY_RUNTIME_ENABLED = False
ALLOWED_SCOPES_EXECUTION = (
    ALLOWED_SCOPES_POLICY if WORKSPACE_MEMORY_RUNTIME_ENABLED else {"user"}
)
# Runtime policy: include default preferences for this incident-update flow.
ENABLE_PREFERENCE_BIAS = True



def _shorten(text: str, *, limit: int = 240) -> str:
    text = (text or "").strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3].rstrip() + "..."



def _pick_applied_memory(
    memory_items: list[dict[str, Any]],
    used_keys: list[str],
) -> list[dict[str, Any]]:
    used = set(used_keys)
    out: list[dict[str, Any]] = []
    for item in memory_items:
        key = item.get("key")
        if key not in used:
            continue
        out.append(
            {
                "key": item["key"],
                "value": item["value"],
                "scope": item["scope"],
                "confidence": item["confidence"],
                "score": item["score"],
            }
        )
    return out



def _has_declared_memory_application(*, answer: str, applied_memory: list[dict[str, Any]]) -> bool:
    """
    Conservative audit check:
    - update_channel: value should appear in answer text.
    - response_style=concise: short-response format compliance.
    - language=english: answer should use a stable prefix "Incident update:".
    If no verifiable key exists, do not block.
    """
    if not applied_memory:
        return False

    normalized_answer = " ".join((answer or "").lower().split())
    evidenced_any = False
    has_verifiable_key = False

    def _is_concise(text: str) -> bool:
        words = re.findall(r"[a-zA-Z0-9_]+", text)
        sentence_count = len(re.findall(r"[.!?]+", text))
        return len(words) <= 80 and sentence_count <= 3

    for item in applied_memory:
        key = str(item.get("key") or "").strip().lower()
        value = str(item.get("value", "")).strip().lower()
        if not key or not value:
            continue

        if key == "update_channel":
            has_verifiable_key = True
            if value in normalized_answer:
                evidenced_any = True
        elif key == "response_style":
            has_verifiable_key = True
            if value == "concise" and _is_concise(answer):
                evidenced_any = True
        elif key == "language":
            if value in {"english", "en"}:
                has_verifiable_key = True
                if normalized_answer.startswith("incident update:"):
                    evidenced_any = True
            continue
        else:
            continue

    if not has_verifiable_key:
        return True
    return evidenced_any



def run_memory_augmented(
    *,
    user_id: int,
    session_1_message: str,
    session_2_goal: str,
) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    store = MemoryStore(max_items=100)
    gateway = MemoryGateway(
        store=store,
        budget=BUDGET,
        allow_execution_keys=ALLOWED_MEMORY_KEYS_EXECUTION,
        allow_execution_scopes=ALLOWED_SCOPES_EXECUTION,
    )

    try:
        raw_capture = extract_memory_candidates(
            user_message=session_1_message,
            available_keys=sorted(ALLOWED_MEMORY_KEYS_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="capture")

    try:
        capture_payload = validate_memory_candidates(
            raw_capture,
            allowed_keys_policy=ALLOWED_MEMORY_KEYS_POLICY,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_items=BUDGET.max_capture_items,
            max_value_chars=BUDGET.max_value_chars,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="capture",
            raw_capture=raw_capture,
        )

    write_result = gateway.write(
        user_id=user_id,
        items=capture_payload["items"],
        source="session_1",
    )

    refreshed_items = [item for item in write_result["written"] if item.get("refreshed")]
    written_items = [item for item in write_result["written"] if not item.get("refreshed")]

    trace.append(
        {
            "step": 1,
            "phase": "capture_store",
            "candidates": len(capture_payload["items"]),
            "written": len(written_items),
            "refreshed": len(refreshed_items),
            "blocked": len(write_result["blocked"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 1,
            "session": "session_1",
            "message": session_1_message,
            "written_keys": [item["key"] for item in written_items],
            "refreshed_keys": [item["key"] for item in refreshed_items],
            "blocked": write_result["blocked"],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="retrieve_plan")

    try:
        raw_intent = plan_retrieval_intent(
            goal=session_2_goal,
            available_scopes=sorted(ALLOWED_SCOPES_POLICY),
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="retrieve_plan")

    try:
        intent = validate_retrieval_intent(
            raw_intent,
            allowed_scopes_policy=ALLOWED_SCOPES_POLICY,
            max_top_k=BUDGET.max_retrieve_top_k,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve_plan",
            raw_intent=raw_intent,
        )

    try:
        retrieval = gateway.retrieve(
            user_id=user_id,
            intent=intent,
            include_preference_keys=ENABLE_PREFERENCE_BIAS,
        )
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase="retrieve",
            intent=intent,
        )

    trace.append(
        {
            "step": 2,
            "phase": "retrieve",
            "query": retrieval["query"],
            "requested_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "memory_hits": len(retrieval["items"]),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 2,
            "session": "session_2",
            "intent": intent,
            "resolved_scopes": retrieval["requested_scopes"],
            "include_preference_keys": retrieval["include_preference_keys"],
            "retrieved_keys": [item["key"] for item in retrieval["items"]],
        }
    )

    if (time.monotonic() - started) > BUDGET.max_seconds:
        return stopped("max_seconds", phase="apply")

    try:
        final = compose_memory_augmented_answer(
            goal=session_2_goal,
            incident_context=INCIDENT_CONTEXT,
            memory_items=retrieval["items"],
        )
    except LLMTimeout:
        return stopped("llm_timeout", phase="apply")
    except LLMInvalid as exc:
        return stopped(exc.args[0], phase="apply")
    except LLMEmpty:
        return stopped("llm_empty", phase="apply")

    retrieved_keys = {item["key"] for item in retrieval["items"]}
    invalid_used_keys = sorted(
        set(final["used_memory_keys"]) - retrieved_keys,
    )
    if invalid_used_keys:
        return stopped(
            "invalid_answer:memory_keys_out_of_context",
            phase="apply",
            invalid_used_memory_keys=invalid_used_keys,
            retrieved_keys=sorted(retrieved_keys),
        )

    if len(final["answer"]) > BUDGET.max_answer_chars:
        return stopped("invalid_answer:too_long", phase="apply")

    applied_memory = _pick_applied_memory(retrieval["items"], final["used_memory_keys"])
    if final["used_memory_keys"] and not _has_declared_memory_application(
        answer=final["answer"],
        applied_memory=applied_memory,
    ):
        return stopped(
            "invalid_answer:memory_declared_but_not_applied",
            phase="apply",
            used_memory_keys=final["used_memory_keys"],
            applied_memory=applied_memory,
        )

    trace.append(
        {
            "step": 3,
            "phase": "apply",
            "used_memory_keys": final["used_memory_keys"],
            "applied_memory_count": len(applied_memory),
            "ok": True,
        }
    )

    history.append(
        {
            "step": 3,
            "action": "compose_memory_augmented_answer",
            "used_memory_keys": final["used_memory_keys"],
            "answer": _shorten(final["answer"]),
        }
    )

    return {
        "run_id": run_id,
        "status": "ok",
        "stop_reason": "success",
        "outcome": "memory_applied" if final["used_memory_keys"] else "context_only",
        "answer": final["answer"],
        "used_memory_keys": final["used_memory_keys"],
        "applied_memory": applied_memory,
        "memory_state": store.dump_user_records(user_id=user_id),
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_memory_augmented(
        user_id=USER_ID,
        session_1_message=SESSION_1_USER_MESSAGE,
        session_2_goal=SESSION_2_GOAL,
    )
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklaert)

  • Session 1 und Session 2 werden hier innerhalb eines Process-Runs ueber einen gemeinsamen Memory-Store simuliert.
  • ALLOWED_MEMORY_KEYS_POLICY und ALLOWED_MEMORY_KEYS_EXECUTION koennen bewusst unterschiedlich sein.
  • ENABLE_PREFERENCE_BIAS=True wird nur fuer den Incident-Update-Flow aktiviert, wo Preference-Felder fast immer gebraucht werden.
  • Jedes Result-Payload enthaelt run_id zur Log-Korrelation.
  • Finale Pruefung used_memory_keys + memory_declared_but_not_applied macht den Apply-Schritt audit-friendly.

Beispielausgabe

JSON
{
  "run_id": "f06d5282-bda4-44df-8bf9-38f53cdb2fb9",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "memory_applied",
  "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state. Failed payment rate is 3.4% with 5 chargeback alerts, ETA 45 minutes. Next actions: monitor gateway performance and send customer updates via email.",
  "used_memory_keys": [
    "language",
    "update_channel",
    "response_style"
  ],
  "applied_memory": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "confidence": 0.95,
      "score": 0.685
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "confidence": 0.9,
      "score": 0.67
    }
  ],
  "memory_state": [
    {
      "key": "language",
      "value": "english",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    },
    {
      "key": "response_style",
      "value": "concise",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.9,
      "ttl_left_days": 180.0
    },
    {
      "key": "update_channel",
      "value": "email",
      "scope": "user",
      "source": "session_1",
      "confidence": 0.95,
      "ttl_left_days": 180.0
    }
  ],
  "trace": [
    {
      "step": 1,
      "phase": "capture_store",
      "candidates": 4,
      "written": 3,
      "refreshed": 0,
      "blocked": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "retrieve",
      "query": "payment incident update and next actions",
      "requested_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "memory_hits": 3,
      "ok": true
    },
    {
      "step": 3,
      "phase": "apply",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "applied_memory_count": 3,
      "ok": true
    }
  ],
  "history": [
    {
      "step": 1,
      "session": "session_1",
      "message": "For future incident updates, write in English, keep replies concise, use email as the primary channel, and remember that I am enterprise tier.",
      "written_keys": [
        "language",
        "response_style",
        "update_channel"
      ],
      "refreshed_keys": [],
      "blocked": [
        {
          "key": "declared_tier",
          "reason": "key_denied_execution"
        }
      ]
    },
    {
      "step": 2,
      "session": "session_2",
      "intent": {
        "kind": "retrieve_memory",
        "query": "payment incident update and next actions",
        "top_k": 4
      },
      "resolved_scopes": [
        "user"
      ],
      "include_preference_keys": true,
      "retrieved_keys": [
        "language",
        "update_channel",
        "response_style"
      ]
    },
    {
      "step": 3,
      "action": "compose_memory_augmented_answer",
      "used_memory_keys": [
        "language",
        "update_channel",
        "response_style"
      ],
      "answer": "Incident update: On 2026-03-04, the US payment gateway is in a P1 degraded state..."
    }
  ]
}

Typische stop_reason-Werte

  • success — Run wurde korrekt beendet; siehe outcome (memory_applied oder context_only)
  • invalid_memory_candidates:* — Memory-Capture hat die Vertragsvalidierung nicht bestanden
  • invalid_memory_candidates:value_too_long — Value ueberschreitet max_value_chars-Limit
  • memory_key_not_allowed_policy:<key> — LLM hat einen Key ausserhalb der Policy-Allowlist vorgeschlagen
  • memory_scope_not_allowed_policy:<scope> — LLM hat einen Scope ausserhalb der Policy-Allowlist vorgeschlagen
  • invalid_retrieval_intent:* — Retrieval-Intent hat die Policy-Validierung nicht bestanden
  • scope_denied:<scope> — Retrieval-Scope ist durch die Execution-Allowlist nicht erlaubt
  • llm_timeout — LLM hat innerhalb von OPENAI_TIMEOUT_SECONDS nicht geantwortet
  • llm_invalid_json / llm_invalid_schema — Apply-Schritt lieferte ungueltiges JSON/Shape
  • llm_empty — leere finale Antwort
  • invalid_answer:memory_keys_out_of_context — Modell referenziert Memory-Key, der im Retrieval nicht vorhanden war
  • invalid_answer:too_long — finale Antwort ueberschreitet max_answer_chars-Limit
  • invalid_answer:memory_declared_but_not_applied — Modell behauptet Memory-Nutzung, aber Antworttext zeigt sie nicht
  • max_seconds — gesamtes Run-Zeitbudget ueberschritten

Was NICHT gezeigt wird

  • persistenter Speicher (Postgres/Redis/Vector DB) statt In-Memory-Implementierung
  • Verschluesselung/PII redaction vor dem Schreiben von Memory
  • semantisches Retrieval via Embeddings (statt einfachem Token-Overlap)
  • Multi-Tenant-Quotas und Soft/Hard-Retention-Policy
  • Retry/Backoff fuer LLM-Calls
  • Per-Key-Consent und user-visible Memory-UI

Was du als Naechstes ausprobieren kannst

  1. Aktiviere TRUST_DECLARED_TIER_FROM_CHAT=True und pruefe, wie sich blocked in history veraendert.
  2. Aktiviere WORKSPACE_MEMORY_RUNTIME_ENABLED=True und fuege einen Retrieval-Intent mit Scope workspace hinzu.
  3. Fuege eine Policy-Rule fuer Memory-Key timezone hinzu und pruefe die Personalisierung in der Answer.
  4. Ersetze den In-Memory-Store durch externen Speicher und fuege Deduplikation nach key + normalized_value hinzu.
⏱️ 20 Min. LesezeitAktualisiert Mär, 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.
Autor

Diese Dokumentation wird von Engineers kuratiert und gepflegt, die AI-Agenten in der Produktion betreiben.

Die Inhalte sind KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Patterns und Empfehlungen basieren auf Post-Mortems, Failure-Modes und operativen Incidents in produktiven Systemen, auch bei der Entwicklung und dem Betrieb von Governance-Infrastruktur für Agenten bei OnceOnly.