Data-Analysis Agent in Python: Vollständiges Beispiel

Production-style runnable Beispiel eines Data-Analysis Agents in Python mit festem Workflow, policy boundary, Metrik-Validierung und transparentem trace/history.
Auf dieser Seite
  1. Kern des Musters (Kurz)
  2. Was Dieses Beispiel Zeigt
  3. Architektur
  4. Projektstruktur
  5. Ausfuhren
  6. Aufgabe
  7. Code
  8. context.py — request envelope + sample dataset
  9. tools.py — deterministic analysis steps
  10. agent.py — plan proposal + final answer
  11. gateway.py — plan/policy/quality boundaries
  12. main.py — orchestrate analysis workflow
  13. Beispielausgabe
  14. Typische stop_reason
  15. Was Hier NICHT Gezeigt Wird
  16. Was Du Als Nachstes Probieren Kannst

Kern des Musters (Kurz)

Data-Analysis Agent bedeutet: Der Agent "erfindet" keine Metriken, sondern folgt einem festen Workflow:

  • ingest
  • profile
  • transform
  • analyze
  • validate
  • finalize (Host-Orchestrierungsschritt)

Das Ergebnis hangt also von einer kontrollierten Pipeline ab, nicht von freiem Text-Reasoning.


Was Dieses Beispiel Zeigt

  • Der Agent schlagt einen Analyseplan vor, aber runtime validiert Planvertrag und Schrittreihenfolge
  • policy allowlist und execution allowlist fur die Datenquelle werden getrennt behandelt
  • der Workflow ist strikt fixiert: profile/validate kann nicht ubersprungen werden
  • profile-check erkennt Duplikate, Missing Values und ungultige Zeilenwerte; wenn quality-gate scheitert, stoppt die Pipeline vor transform
  • transform-Schritt macht deterministischen Dedup (latest_by_event_ts), Feldnormalisierung und cleaning rules aus policy_hints
  • cleaning rules und dedupe strategy kommen aus policy_hints (policy-konfigurierbare Analyse)
  • analyze-Schritt berechnet Metriken (gross/net revenue, conversion, failure rate, refund rate, latency)
  • validate-Schritt pruft Qualitatsinvarianten und Konsistenz der Aggregate
  • trace/history liefern Auditierbarkeit von Plan bis Finalisierung

Architektur

  1. agent.py erstellt den Plan der Analyse-Schritte.
  2. gateway.py validiert den Plan und die policy-Entscheidung fur ingest.
  3. tools.py fuhrt deterministische Schritte aus: ingest/profile/transform/analyze/validate.
  4. main.py orchestriert den Workflow, fuhrt finalize als Host-Schritt aus, sammelt aggregate, trace/history und liefert den finalen Brief zuruck.

Projektstruktur

TEXT
agent-patterns/
└── data-analysis-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Ausfuhren

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/data-analysis-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ wird benotigt.

Variante uber export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante uber .env (optional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Das ist die Shell-Variante (macOS/Linux). Unter Windows ist die Nutzung von set-Variablen einfacher oder, wenn gewunscht, python-dotenv.


Aufgabe

Production-Fall:

"Erstelle einen customer-safe Analyse-Brief fur die letzten 7 Tage in US: revenue, conversion, failure rate, data-quality checks."


Code

context.py — request envelope + sample dataset

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, source: str) -> dict[str, Any]:
    rows = [
        {
            "order_id": "o-1001",
            "event_ts": "2026-03-01T10:00:00Z",
            "status": "paid",
            "amount": 120.0,
            "channel": "paid_search",
            "latency_ms": 180,
        },
        {
            "order_id": "o-1002",
            "event_ts": "2026-03-01T10:05:00Z",
            "status": "paid",
            "amount": 90.0,
            "channel": "email",
            "latency_ms": 160,
        },
        {
            "order_id": "o-1003",
            "event_ts": "2026-03-01T10:10:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "paid_search",
            "latency_ms": 240,
        },
        {
            "order_id": "o-1004",
            "event_ts": "2026-03-01T10:15:00Z",
            "status": "refunded",
            "amount": 40.0,
            "channel": "email",
            "latency_ms": 210,
        },
        {
            "order_id": "o-1005",
            "event_ts": "2026-03-01T10:20:00Z",
            "status": "paid",
            "amount": 140.0,
            "channel": "paid_search",
            "latency_ms": 170,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:25:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": None,
            "latency_ms": 190,
        },
        {
            "order_id": "o-1007",
            "event_ts": "2026-03-01T10:30:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "organic",
            "latency_ms": 260,
        },
        {
            "order_id": "o-1008",
            "event_ts": "2026-03-01T10:35:00Z",
            "status": "paid",
            "amount": 80.0,
            "channel": "paid_social",
            "latency_ms": 200,
        },
        {
            "order_id": "o-1009",
            "event_ts": "2026-03-01T10:40:00Z",
            "status": "refunded",
            "amount": 30.0,
            "channel": "paid_social",
            "latency_ms": 230,
        },
        {
            "order_id": "o-1010",
            "event_ts": "2026-03-01T10:45:00Z",
            "status": "paid",
            "amount": 70.0,
            "channel": "paid_search",
            "latency_ms": 175,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:50:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": "email",
            "latency_ms": 188,
        },
        {
            "order_id": "o-1011",
            "event_ts": "2026-03-01T10:55:00Z",
            "status": "paid",
            "amount": 60.0,
            "channel": None,
            "latency_ms": 195,
        },
    ]

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "source": source,
            "rows": rows,
        },
        "policy_hints": {
            "allowed_sources": ["warehouse_sales_daily", "warehouse_refunds_daily"],
            "allowed_regions": ["US", "CA"],
            "max_rows": 5000,
            "max_missing_channel_pct": 0.25,
            "analysis_rules": {
                "dedupe_key": "order_id",
                "dedupe_ts_key": "event_ts",
                "dedupe_strategy": "latest_by_event_ts",
                "fill_missing_channel": "unknown",
                "normalize_status": "lower_strip",
            },
        },
    }

tools.py — deterministic analysis steps

PYTHON
from __future__ import annotations

import math
from datetime import datetime
from collections import defaultdict
from typing import Any

ALLOWED_STATUSES = {"paid", "failed", "refunded"}


def _safe_float(value: Any) -> tuple[bool, float]:
    try:
        v = float(value)
    except (TypeError, ValueError):
        return False, 0.0
    if not math.isfinite(v):
        return False, 0.0
    return True, v


def _is_valid_event_ts(value: Any) -> bool:
    if not isinstance(value, str):
        return False
    ts = value.strip()
    if len(ts) != 20 or not ts.endswith("Z"):
        return False
    try:
        datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
    except ValueError:
        return False
    return True


def read_sales_snapshot(*, source: str, region: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
    return {
        "status": "ok",
        "data": {
            "source": source,
            "region": region.upper(),
            "rows": rows,
            "snapshot_at": "2026-03-07T10:00:00Z",
        },
    }


def profile_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    required_fields = {"order_id", "event_ts", "status", "amount", "channel", "latency_ms"}
    schema_ok = True
    missing_fields_count = 0

    missing_channel_count = 0
    duplicate_rows = 0
    by_order_id: dict[str, int] = {}

    invalid_order_id_count = 0
    invalid_event_ts_count = 0
    invalid_status_count = 0
    invalid_amount_count = 0
    invalid_latency_count = 0

    for row in rows:
        keys = set(row.keys())
        if not required_fields.issubset(keys):
            schema_ok = False
            missing_fields_count += 1

        order_id = row.get("order_id")
        order_id_norm = str(order_id).strip() if isinstance(order_id, str) else ""
        if not order_id_norm:
            invalid_order_id_count += 1
        else:
            by_order_id[order_id_norm] = by_order_id.get(order_id_norm, 0) + 1
            if by_order_id[order_id_norm] > 1:
                duplicate_rows += 1

        event_ts = row.get("event_ts")
        if not _is_valid_event_ts(event_ts):
            invalid_event_ts_count += 1

        status = str(row.get("status", "")).strip().lower()
        if status not in ALLOWED_STATUSES:
            invalid_status_count += 1

        ok_amount, amount = _safe_float(row.get("amount"))
        if not ok_amount or amount < 0.0:
            invalid_amount_count += 1

        ok_latency, latency = _safe_float(row.get("latency_ms"))
        if not ok_latency or latency < 0.0:
            invalid_latency_count += 1

        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            missing_channel_count += 1

    total = len(rows)
    missing_channel_pct = (missing_channel_count / total) if total else 0.0

    row_validity_ok = (
        invalid_order_id_count == 0
        and invalid_event_ts_count == 0
        and invalid_status_count == 0
        and invalid_amount_count == 0
        and invalid_latency_count == 0
    )

    return {
        "status": "ok",
        "data": {
            "row_count_raw": total,
            "duplicate_rows": duplicate_rows,
            "missing_channel_count": missing_channel_count,
            "missing_channel_pct": missing_channel_pct,
            "schema_ok": schema_ok,
            "missing_fields_count": missing_fields_count,
            "row_validity_ok": row_validity_ok,
            "invalid_order_id_count": invalid_order_id_count,
            "invalid_event_ts_count": invalid_event_ts_count,
            "invalid_status_count": invalid_status_count,
            "invalid_amount_count": invalid_amount_count,
            "invalid_latency_count": invalid_latency_count,
        },
    }


def transform_sales_rows(
    *,
    rows: list[dict[str, Any]],
    dedupe_key: str,
    dedupe_ts_key: str,
    fill_missing_channel: str,
    normalize_status: str,
) -> dict[str, Any]:
    latest_by_key: dict[str, dict[str, Any]] = {}
    ts_by_key: dict[str, str] = {}
    skipped_rows_missing_key = 0
    skipped_rows_invalid_ts = 0

    for row in rows:
        key = str(row.get(dedupe_key, "")).strip()
        if not key:
            skipped_rows_missing_key += 1
            continue

        event_ts_raw = row.get(dedupe_ts_key)
        if not _is_valid_event_ts(event_ts_raw):
            skipped_rows_invalid_ts += 1
            continue
        event_ts = str(event_ts_raw).strip()
        previous_ts = ts_by_key.get(key, "")
        if key not in latest_by_key or event_ts >= previous_ts:
            latest_by_key[key] = dict(row)
            ts_by_key[key] = event_ts

    cleaned_rows = [latest_by_key[k] for k in sorted(latest_by_key.keys())]

    filled_missing_channel = 0
    for row in cleaned_rows:
        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            row["channel"] = fill_missing_channel
            filled_missing_channel += 1

        if normalize_status == "lower_strip":
            row["status"] = str(row.get("status", "")).strip().lower()

        row[dedupe_key] = str(row.get(dedupe_key, "")).strip()
        row[dedupe_ts_key] = str(row.get(dedupe_ts_key, "")).strip()
        ok_amount, amount = _safe_float(row.get("amount"))
        ok_latency, latency = _safe_float(row.get("latency_ms"))
        row["amount"] = amount if ok_amount else 0.0
        row["latency_ms"] = latency if ok_latency else 0.0

    return {
        "status": "ok",
        "data": {
            "rows": cleaned_rows,
            "row_count_clean": len(cleaned_rows),
            "deduped_rows": max(0, len(rows) - len(cleaned_rows)),
            "filled_missing_channel": filled_missing_channel,
            "skipped_rows_missing_key": skipped_rows_missing_key,
            "skipped_rows_invalid_ts": skipped_rows_invalid_ts,
            "dedupe_strategy": "latest_by_event_ts",
            "dedupe_key": dedupe_key,
            "dedupe_ts_key": dedupe_ts_key,
        },
    }


def analyze_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    total = len(rows)
    paid_count = 0
    failed_count = 0
    refunded_count = 0

    gross_revenue = 0.0
    refund_amount_total = 0.0
    latency_values: list[float] = []
    channel_net_revenue: dict[str, float] = defaultdict(float)

    for row in rows:
        status = str(row.get("status", "")).lower()
        amount = float(row.get("amount", 0.0))
        channel = str(row.get("channel", "unknown")).strip().lower() or "unknown"
        latency = float(row.get("latency_ms", 0.0))
        latency_values.append(latency)

        if status == "paid":
            paid_count += 1
            gross_revenue += amount
            channel_net_revenue[channel] += amount
        elif status == "failed":
            failed_count += 1
        elif status == "refunded":
            refunded_count += 1
            refund_amount_total += amount
            channel_net_revenue[channel] -= amount

    net_revenue = gross_revenue - refund_amount_total
    conversion_rate = (paid_count / total) if total else 0.0
    failed_payment_rate = (failed_count / total) if total else 0.0
    refund_rate = (refunded_count / total) if total else 0.0

    avg_latency_ms = (sum(latency_values) / len(latency_values)) if latency_values else 0.0
    sorted_latency = sorted(latency_values)
    if sorted_latency:
        p95_idx = int((len(sorted_latency) - 1) * 0.95)
        p95_latency_ms = sorted_latency[p95_idx]
    else:
        p95_latency_ms = 0.0

    top_channel = "unknown"
    top_channel_value = float("-inf")
    for channel, value in channel_net_revenue.items():
        if value > top_channel_value:
            top_channel = channel
            top_channel_value = value

    eta_minutes = 45 if failed_payment_rate >= 0.15 else 20

    return {
        "status": "ok",
        "data": {
            "sample_size": total,
            "paid_count": paid_count,
            "failed_count": failed_count,
            "refunded_count": refunded_count,
            "gross_revenue": gross_revenue,
            "refund_amount_total": refund_amount_total,
            "net_revenue": net_revenue,
            "conversion_rate": conversion_rate,
            "failed_payment_rate": failed_payment_rate,
            "refund_rate": refund_rate,
            "avg_latency_ms": avg_latency_ms,
            "p95_latency_ms": p95_latency_ms,
            "top_channel": top_channel,
            "eta_minutes": eta_minutes,
        },
    }


def validate_analysis(
    *,
    metrics: dict[str, Any],
    profile: dict[str, Any],
    max_missing_channel_pct: float,
) -> dict[str, Any]:
    passed_checks: list[str] = []
    failed_checks: list[str] = []

    conversion_rate = float(metrics.get("conversion_rate", 0.0))
    failed_payment_rate = float(metrics.get("failed_payment_rate", 0.0))
    refund_rate = float(metrics.get("refund_rate", 0.0))

    gross_revenue = float(metrics.get("gross_revenue", 0.0))
    refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
    net_revenue = float(metrics.get("net_revenue", 0.0))

    sample_size = int(metrics.get("sample_size", 0))
    p95_latency_ms = float(metrics.get("p95_latency_ms", 0.0))
    missing_channel_pct = float(profile.get("missing_channel_pct", 0.0))
    row_validity_ok = bool(profile.get("row_validity_ok"))

    if 0.0 <= conversion_rate <= 1.0:
        passed_checks.append("conversion_between_0_1")
    else:
        failed_checks.append("conversion_out_of_range")

    if 0.0 <= refund_rate <= 1.0:
        passed_checks.append("refund_rate_between_0_1")
    else:
        failed_checks.append("refund_rate_out_of_range")

    if gross_revenue >= 0.0:
        passed_checks.append("gross_revenue_non_negative")
    else:
        failed_checks.append("gross_revenue_negative")

    if refund_amount_total >= 0.0:
        passed_checks.append("refund_amount_non_negative")
    else:
        failed_checks.append("refund_amount_negative")

    if net_revenue >= 0.0:
        passed_checks.append("net_revenue_non_negative")
    else:
        failed_checks.append("net_revenue_negative")

    if failed_payment_rate <= 0.35:
        passed_checks.append("failed_rate_reasonable")
    else:
        failed_checks.append("failed_rate_too_high")

    if sample_size >= 10:
        passed_checks.append("sample_size_min_10")
    else:
        failed_checks.append("sample_size_too_low")

    if p95_latency_ms <= 400.0:
        passed_checks.append("p95_latency_under_400")
    else:
        failed_checks.append("p95_latency_too_high")

    if missing_channel_pct <= max_missing_channel_pct:
        passed_checks.append("missing_channel_within_policy")
    else:
        failed_checks.append("missing_channel_policy_violation")

    if gross_revenue >= net_revenue:
        passed_checks.append("gross_ge_net")
    else:
        failed_checks.append("gross_less_than_net")

    if abs((gross_revenue - refund_amount_total) - net_revenue) <= 0.01:
        passed_checks.append("gross_minus_refunds_equals_net")
    else:
        failed_checks.append("gross_refund_net_mismatch")

    if row_validity_ok:
        passed_checks.append("profile_row_values_valid")
    else:
        failed_checks.append("profile_row_values_invalid")

    return {
        "status": "ok",
        "data": {
            "ok": len(failed_checks) == 0,
            "passed_checks": passed_checks,
            "failed_checks": failed_checks,
        },
    }

agent.py — plan proposal + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_analysis_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    req = request["request"]
    return {
        "steps": [
            {
                "id": "s1",
                "action": "ingest_sales",
                "args": {
                    "source": req["source"],
                    "region": req["region"],
                },
            },
            {"id": "s2", "action": "profile_sales", "args": {}},
            {"id": "s3", "action": "transform_sales", "args": {}},
            {"id": "s4", "action": "analyze_sales", "args": {}},
            {"id": "s5", "action": "validate_analysis", "args": {}},
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    quality: dict[str, Any],
) -> str:
    req = request["request"]
    m = aggregate["metrics"]
    checks = quality.get("passed_checks", [])

    return (
        f"Data analysis brief ({req['region']}, {req['report_date']}): source {req['source']} shows net revenue "
        f"{m['net_revenue']:.2f} (gross {m['gross_revenue']:.2f}, refunds {m['refund_amount_total']:.2f} subtracted), "
        f"conversion {m['conversion_rate_pct']:.2f}%, failed payment rate {m['failed_payment_rate_pct']:.2f}%, "
        f"refund rate {m['refund_rate_pct']:.2f}%, top channel {m['top_channel']}, "
        f"and p95 latency {m['p95_latency_ms']:.2f} ms. Quality checks passed: {len(checks)}."
    )

gateway.py — plan/policy/quality boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_rows: int = 5000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "ingest_sales",
    "profile_sales",
    "transform_sales",
    "analyze_sales",
    "validate_analysis",
]


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    steps: list[dict[str, Any]] = []
    actions: list[str] = []
    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        steps.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return steps


class DataAnalysisGateway:
    def __init__(
        self,
        *,
        allowed_sources_policy: set[str],
        allowed_sources_execution: set[str],
        allowed_regions_policy: set[str],
        budget: Budget,
    ):
        self.allowed_sources_policy = set(allowed_sources_policy)
        self.allowed_sources_execution = set(allowed_sources_execution)
        self.allowed_regions_policy = {x.upper() for x in allowed_regions_policy}
        self.budget = budget

    def evaluate_ingest(self, *, source: str, region: str) -> Decision:
        if source not in self.allowed_sources_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if region.upper() not in self.allowed_regions_policy:
            return Decision(kind="deny", reason="region_denied_policy")
        if source not in self.allowed_sources_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def ensure_rows_budget(self, *, rows: list[dict[str, Any]]) -> None:
        if len(rows) > self.budget.max_rows:
            raise StopRun("dataset_too_large")

    def validate_profile(self, *, profile: dict[str, Any], max_missing_channel_pct: float) -> None:
        if not bool(profile.get("schema_ok")):
            raise StopRun("invalid_profile:schema")
        if not bool(profile.get("row_validity_ok")):
            raise StopRun("invalid_profile:row_values")

        missing_pct = float(profile.get("missing_channel_pct", 0.0))
        if missing_pct > max_missing_channel_pct:
            raise StopRun("invalid_profile:missing_channel_too_high")

    def validate_metrics(self, *, metrics: dict[str, Any]) -> None:
        sample_size = int(metrics.get("sample_size", 0))
        if sample_size <= 0:
            raise StopRun("invalid_metrics:sample_size")

        conversion = float(metrics.get("conversion_rate", 0.0))
        if not (0.0 <= conversion <= 1.0):
            raise StopRun("invalid_metrics:conversion_rate")

        failed_rate = float(metrics.get("failed_payment_rate", 0.0))
        if not (0.0 <= failed_rate <= 1.0):
            raise StopRun("invalid_metrics:failed_payment_rate")

        refund_rate = float(metrics.get("refund_rate", 0.0))
        if not (0.0 <= refund_rate <= 1.0):
            raise StopRun("invalid_metrics:refund_rate")

        gross_revenue = float(metrics.get("gross_revenue", 0.0))
        refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
        net_revenue = float(metrics.get("net_revenue", 0.0))

        if net_revenue < 0:
            raise StopRun("invalid_metrics:net_revenue")
        if gross_revenue < net_revenue:
            raise StopRun("invalid_metrics:gross_less_than_net")
        if abs((gross_revenue - refund_amount_total) - net_revenue) > 0.01:
            raise StopRun("invalid_metrics:revenue_consistency")

main.py — orchestrate analysis workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_analysis_plan
from context import build_request
from gateway import Budget, DataAnalysisGateway, StopRun, validate_plan
from tools import (
    analyze_sales_rows,
    profile_sales_rows,
    read_sales_snapshot,
    transform_sales_rows,
    validate_analysis,
)

GOAL = (
    "Prepare a validated weekly data-analysis brief for US payments with revenue, "
    "conversion, failed payment rate, and quality checks."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    source="warehouse_sales_daily",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_rows=5000,
)

DEFAULT_ALLOWED_SOURCES_POLICY = {"warehouse_sales_daily", "warehouse_refunds_daily"}
ALLOWED_SOURCES_EXECUTION = {"warehouse_sales_daily"}
DEFAULT_ALLOWED_REGIONS_POLICY = {"US", "CA"}


DEFAULT_ANALYSIS_RULES = {
    "dedupe_key": "order_id",
    "dedupe_ts_key": "event_ts",
    "dedupe_strategy": "latest_by_event_ts",
    "fill_missing_channel": "unknown",
    "normalize_status": "lower_strip",
}


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def run_data_analysis_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    artifact_ids = {
        "snapshot_id": f"snap_{run_id[:8]}",
        "profile_id": f"profile_{run_id[:8]}",
        "transform_id": f"transform_{run_id[:8]}",
        "metrics_id": f"metrics_{run_id[:8]}",
        "quality_id": f"quality_{run_id[:8]}",
    }

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_sources_raw = hints.get("allowed_sources")
    if isinstance(allowed_sources_raw, list):
        allowed_sources_policy = {
            str(item).strip()
            for item in allowed_sources_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
    if not allowed_sources_policy:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)

    allowed_regions_raw = hints.get("allowed_regions")
    if isinstance(allowed_regions_raw, list):
        allowed_regions_policy = {
            str(item).strip().upper()
            for item in allowed_regions_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
    if not allowed_regions_policy:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)

    max_rows_raw = hints.get("max_rows", DEFAULT_BUDGET.max_rows)
    max_missing_channel_pct_raw = hints.get("max_missing_channel_pct", 0.25)

    analysis_rules_raw = hints.get("analysis_rules")
    analysis_rules = dict(DEFAULT_ANALYSIS_RULES)
    if isinstance(analysis_rules_raw, dict):
        for key, value in analysis_rules_raw.items():
            if isinstance(key, str):
                analysis_rules[key] = value

    dedupe_key = str(analysis_rules.get("dedupe_key", DEFAULT_ANALYSIS_RULES["dedupe_key"])).strip() or "order_id"
    dedupe_ts_key = str(analysis_rules.get("dedupe_ts_key", DEFAULT_ANALYSIS_RULES["dedupe_ts_key"])).strip() or "event_ts"
    dedupe_strategy = str(
        analysis_rules.get("dedupe_strategy", DEFAULT_ANALYSIS_RULES["dedupe_strategy"])
    ).strip() or "latest_by_event_ts"
    fill_missing_channel = str(
        analysis_rules.get("fill_missing_channel", DEFAULT_ANALYSIS_RULES["fill_missing_channel"])
    ).strip() or "unknown"
    normalize_status = str(
        analysis_rules.get("normalize_status", DEFAULT_ANALYSIS_RULES["normalize_status"])
    ).strip() or "lower_strip"

    if dedupe_strategy != "latest_by_event_ts":
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:dedupe_strategy",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_rows = int(max_rows_raw)
    except (TypeError, ValueError):
        max_rows = DEFAULT_BUDGET.max_rows
    if max_rows <= 0:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_rows",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_missing_channel_pct = float(max_missing_channel_pct_raw)
    except (TypeError, ValueError):
        max_missing_channel_pct = 0.25
    if not (0.0 <= max_missing_channel_pct <= 1.0):
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_missing_channel_pct",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_rows=max(10, min(200000, max_rows)),
    )

    gateway = DataAnalysisGateway(
        allowed_sources_policy=allowed_sources_policy,
        allowed_sources_execution=ALLOWED_SOURCES_EXECUTION,
        allowed_regions_policy=allowed_regions_policy,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_analysis_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_analysis_plan",
                "step_ids": [item["id"] for item in steps],
            }
        )

        phase = "policy_check"
        ingest_args = steps[0]["args"]
        source = str(ingest_args.get("source", "")).strip()
        region = str(ingest_args.get("region", "")).strip().upper()
        decision = gateway.evaluate_ingest(source=source, region=region)

        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "source": source,
                "region": region,
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_sources_policy": sorted(allowed_sources_policy),
                "allowed_sources_execution": sorted(ALLOWED_SOURCES_EXECUTION),
                "elapsed_ms": elapsed_ms(),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {"kind": decision.kind, "reason": decision.reason},
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="ingest_profile")

        phase = "ingest_profile"
        req = request["request"]
        snapshot = _unwrap_tool_data(
            read_sales_snapshot(
                source=source,
                region=region,
                rows=list(req["rows"]),
            ),
            tool_name="read_sales_snapshot",
        )
        rows_raw = list(snapshot["rows"])
        gateway.ensure_rows_budget(rows=rows_raw)

        profile = _unwrap_tool_data(
            profile_sales_rows(rows=rows_raw),
            tool_name="profile_sales_rows",
        )
        gateway.validate_profile(profile=profile, max_missing_channel_pct=max_missing_channel_pct)

        trace.append(
            {
                "step": 3,
                "phase": "ingest_profile",
                "row_count_raw": profile["row_count_raw"],
                "duplicate_rows": profile["duplicate_rows"],
                "missing_channel_pct": round(float(profile["missing_channel_pct"]) * 100, 2),
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "ingest_profile",
                "source": source,
                "row_count_raw": profile["row_count_raw"],
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="transform_analyze")

        phase = "transform_analyze"
        transformed = _unwrap_tool_data(
            transform_sales_rows(
                rows=rows_raw,
                dedupe_key=dedupe_key,
                dedupe_ts_key=dedupe_ts_key,
                fill_missing_channel=fill_missing_channel,
                normalize_status=normalize_status,
            ),
            tool_name="transform_sales_rows",
        )
        rows_clean = list(transformed["rows"])

        metrics = _unwrap_tool_data(
            analyze_sales_rows(rows=rows_clean),
            tool_name="analyze_sales_rows",
        )
        gateway.validate_metrics(metrics=metrics)

        trace.append(
            {
                "step": 4,
                "phase": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "deduped_rows": transformed["deduped_rows"],
                "filled_missing_channel": transformed["filled_missing_channel"],
                "skipped_rows_missing_key": transformed.get("skipped_rows_missing_key", 0),
                "skipped_rows_invalid_ts": transformed.get("skipped_rows_invalid_ts", 0),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "applied_rules": {
                    "dedupe_strategy": dedupe_strategy,
                    "dedupe_key": dedupe_key,
                    "dedupe_ts_key": dedupe_ts_key,
                    "fill_missing_channel": fill_missing_channel,
                    "normalize_status": normalize_status,
                },
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="validate")

        phase = "validate"
        quality = _unwrap_tool_data(
            validate_analysis(
                metrics=metrics,
                profile=profile,
                max_missing_channel_pct=max_missing_channel_pct,
            ),
            tool_name="validate_analysis",
        )
        if not bool(quality.get("ok")):
            failed = quality.get("failed_checks") or []
            first = str(failed[0]) if failed else "unknown"
            return stopped(f"validation_failed:{first}", phase=phase, quality=quality)

        trace.append(
            {
                "step": 5,
                "phase": "validate",
                "passed_checks": len(quality.get("passed_checks", [])),
                "failed_checks": len(quality.get("failed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "validate_analysis",
                "passed_checks": len(quality.get("passed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "source": source,
            "metrics": {
                "sample_size": int(metrics["sample_size"]),
                "gross_revenue": round(float(metrics["gross_revenue"]), 2),
                "refund_amount_total": round(float(metrics["refund_amount_total"]), 2),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate": round(float(metrics["conversion_rate"]), 6),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate": round(float(metrics["failed_payment_rate"]), 6),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "refund_rate": round(float(metrics["refund_rate"]), 6),
                "refund_rate_pct": round(float(metrics["refund_rate"]) * 100, 2),
                "top_channel": str(metrics["top_channel"]),
                "avg_latency_ms": round(float(metrics["avg_latency_ms"]), 2),
                "p95_latency_ms": round(float(metrics["p95_latency_ms"]), 2),
                "eta_minutes": int(metrics["eta_minutes"]),
            },
            "data_quality": {
                "row_count_raw": int(profile["row_count_raw"]),
                "row_count_clean": int(transformed["row_count_clean"]),
                "deduped_rows": int(transformed["deduped_rows"]),
                "missing_channel_pct_raw": round(float(profile["missing_channel_pct"]) * 100, 2),
                "row_validity_ok": bool(profile["row_validity_ok"]),
                "invalid_order_id_count": int(profile["invalid_order_id_count"]),
                "invalid_event_ts_count": int(profile["invalid_event_ts_count"]),
                "invalid_status_count": int(profile["invalid_status_count"]),
                "invalid_amount_count": int(profile["invalid_amount_count"]),
                "invalid_latency_count": int(profile["invalid_latency_count"]),
                "skipped_rows_missing_key": int(transformed.get("skipped_rows_missing_key", 0)),
                "skipped_rows_invalid_ts": int(transformed.get("skipped_rows_invalid_ts", 0)),
            },
        }

        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            quality=quality,
        )

        trace.append(
            {
                "step": 6,
                "phase": "finalize",
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "validated_analysis",
            "answer": answer,
            "aggregate": aggregate,
            "quality": quality,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_data_analysis_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Was hier am wichtigsten ist (einfach erklart)

  • der Workflow ist erzwungen fixiert, daher kann der Agent profile/validate nicht "uberspringen"
  • policy boundary entscheidet separat, ob die Quelle in diesem run genutzt werden darf
  • Profiling + Validierung sind ein technisches quality-gate, nicht "Glaube an die Zahl"; bei invalid_profile:* stoppt der run vor transform
  • Dedup ist deterministisch (latest_by_event_ts) fur strikt UTC event_ts (YYYY-MM-DDTHH:MM:SSZ), und cleaning rules kommen explizit aus policy_hints
  • deduped_rows zeigt in dieser Demo insgesamt entfernte Zeilen nach transform (Dedup + mogliches Verwerfen ungultiger Zeilen); im aktuellen Sample gibt es keine Verwerfungen
  • filled_missing_channel wird nach Dedup gezahlt: wenn ein Duplikat durch eine neuere Zeile mit gultigem channel ersetzt wird, ist kein Fill notig (darum hier 1 statt 2)
  • finalize ist hier kein Tool-Schritt, sondern ein Orchestrierungsschritt in main.py
  • der finale Brief wird nur aus validierten Metriken gebildet
  • trace/history mit elapsed_ms und artifact_refs machen die Pipeline auditierbar

Beispielausgabe

JSON
{
  "run_id": "3045a7d0-1431-455b-9d26-16ae9fbc653a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "validated_analysis",
  "answer": "Data analysis brief (US, 2026-03-07): source warehouse_sales_daily shows net revenue 600.00 (gross 670.00, refunds 70.00 subtracted), conversion 63.64%, failed payment rate 18.18%, refund rate 18.18%, top channel paid_search, and p95 latency 240.00 ms. Quality checks passed: 12.",
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "source": "warehouse_sales_daily",
    "metrics": {
      "sample_size": 11,
      "gross_revenue": 670.0,
      "refund_amount_total": 70.0,
      "net_revenue": 600.0,
      "conversion_rate": 0.636364,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate": 0.181818,
      "failed_payment_rate_pct": 18.18,
      "refund_rate": 0.181818,
      "refund_rate_pct": 18.18,
      "top_channel": "paid_search",
      "avg_latency_ms": 200.73,
      "p95_latency_ms": 240.0,
      "eta_minutes": 45
    },
    "data_quality": {
      "row_count_raw": 12,
      "row_count_clean": 11,
      "deduped_rows": 1,
      "missing_channel_pct_raw": 16.67,
      "row_validity_ok": true,
      "invalid_order_id_count": 0,
      "invalid_event_ts_count": 0,
      "invalid_status_count": 0,
      "invalid_amount_count": 0,
      "invalid_latency_count": 0,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0
    }
  },
  "quality": {
    "ok": true,
    "passed_checks": [
      "conversion_between_0_1",
      "refund_rate_between_0_1",
      "gross_revenue_non_negative",
      "refund_amount_non_negative",
      "net_revenue_non_negative",
      "failed_rate_reasonable",
      "sample_size_min_10",
      "p95_latency_under_400",
      "missing_channel_within_policy",
      "gross_ge_net",
      "gross_minus_refunds_equals_net",
      "profile_row_values_valid"
    ],
    "failed_checks": []
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "source": "warehouse_sales_daily",
      "region": "US",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_sources_policy": [
        "warehouse_refunds_daily",
        "warehouse_sales_daily"
      ],
      "allowed_sources_execution": [
        "warehouse_sales_daily"
      ],
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "ingest_profile",
      "row_count_raw": 12,
      "duplicate_rows": 1,
      "missing_channel_pct": 16.67,
      "artifact_refs": {
        "snapshot_id": "snap_3045a7d0",
        "profile_id": "profile_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 4,
      "phase": "transform_analyze",
      "row_count_clean": 11,
      "deduped_rows": 1,
      "filled_missing_channel": 1,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0,
      "net_revenue": 600.0,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate_pct": 18.18,
      "artifact_refs": {
        "transform_id": "transform_3045a7d0",
        "metrics_id": "metrics_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 5,
      "phase": "validate",
      "passed_checks": 12,
      "failed_checks": 0,
      "artifact_refs": {
        "quality_id": "quality_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 6,
      "phase": "finalize",
      "elapsed_ms": 2,
      "ok": true
    }
  ],
  "history": [{...}]
}

Typische stop_reason

  • success - run korrekt beendet
  • max_seconds - gesamtes time budget aufgebraucht
  • invalid_plan:* - ungueltiger Analyseplan
  • invalid_plan:step_sequence - verpflichtende Workflow-Reihenfolge verletzt (mit details expected/received)
  • invalid_step:* - ungueltiger step-Vertrag
  • policy_block:source_denied_policy - Quelle durch policy allowlist gesperrt
  • policy_block:source_denied_execution - Quelle durch execution allowlist gesperrt
  • policy_block:region_denied_policy - Region durch policy gesperrt
  • dataset_too_large - Zeilenlimit uberschritten
  • invalid_profile:schema - profile hat schema mismatch erkannt
  • invalid_profile:row_values - profile hat ungueltige Zeilenwerte erkannt
  • invalid_profile:missing_channel_too_high - Anteil fehlender Channel uber policy threshold
  • invalid_metrics:* - metrics haben Basisinvarianten nicht bestanden
  • validation_failed:* - quality validation fehlgeschlagen
  • tool_invalid_output:* - Schritt lieferte ungueltigen Vertrag
  • invalid_request:dedupe_strategy - nicht unterstutzte dedupe strategy in analysis_rules
  • invalid_request:max_rows - ungueltiger max_rows-Wert
  • invalid_request:max_missing_channel_pct - ungueltiger max_missing_channel_pct-Threshold

Was Hier NICHT Gezeigt Wird

  • Anbindung an echtes DWH/BI statt in-memory snapshot
  • Versionierung von Datensatzen (snapshot lineage)
  • SQL pushdown und cost-aware query planning
  • multi-source joins mit schema-Konflikten
  • anomaly detection uber einfache Invarianten hinaus
  • persisted artifacts (Tabellen/Grafiken) in Storage

Was Du Als Nachstes Probieren Kannst

  1. source="warehouse_refunds_daily" setzen und policy_block:source_denied_execution ansehen.
  2. max_missing_channel_pct=0.1 setzen und invalid_profile:missing_channel_too_high ansehen.
  3. Im Plan einen step ausserhalb von EXPECTED_ACTION_SEQUENCE einfugen und invalid_plan:step_sequence ansehen.
  4. rows uber max_rows erhohen und dataset_too_large ansehen.
  5. Im analyze-Schritt conversion_rate=1.2 oder gross < net zuruckgeben und invalid_metrics:* ansehen.
⏱️ 19 Min. LesezeitAktualisiert 6. März 2026Schwierigkeit: ★★☆
Integriert: Production ControlOnceOnly
Guardrails für Tool-Calling-Agents
Shippe dieses Pattern mit Governance:
  • Budgets (Steps / Spend Caps)
  • Tool-Permissions (Allowlist / Blocklist)
  • Kill switch & Incident Stop
  • Idempotenz & Dedupe
  • Audit logs & Nachvollziehbarkeit
Integrierter Hinweis: OnceOnly ist eine Control-Layer für Production-Agent-Systeme.

Autor

Nick — Engineer, der Infrastruktur für KI-Agenten in Produktion aufbaut.

Fokus: Agent-Patterns, Failure-Modes, Runtime-Steuerung und Systemzuverlässigkeit.

🔗 GitHub: https://github.com/mykolademyanov


Redaktioneller Hinweis

Diese Dokumentation ist KI-gestützt, mit menschlicher redaktioneller Verantwortung für Genauigkeit, Klarheit und Produktionsrelevanz.

Der Inhalt basiert auf realen Ausfällen, Post-Mortems und operativen Vorfällen in produktiv eingesetzten KI-Agenten-Systemen.