Data-Analysis Agent на Python: повний приклад

Готовий до запуску production-style приклад Data-Analysis агента на Python з фіксованим workflow, policy boundary, валідацією метрик і прозорими trace/history.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Код
  8. context.py — request envelope + sample dataset
  9. tools.py — deterministic analysis steps
  10. agent.py — plan proposal + final answer
  11. gateway.py — plan/policy/quality boundaries
  12. main.py — orchestrate analysis workflow
  13. Приклад виводу
  14. Типові stop_reason
  15. Що тут НЕ показано
  16. Що спробувати далі

Суть патерна (коротко)

Data-Analysis Agent означає: агент не "вигадує" метрики, а проходить фіксований workflow:

  • ingest
  • profile
  • transform
  • analyze
  • validate
  • finalize (host-крок оркестрації)

Тобто результат залежить від контрольованого пайплайну, а не від вільного текстового міркування.


Що демонструє цей приклад

  • Агент пропонує план аналізу, але runtime перевіряє контракт плану і порядок кроків
  • policy allowlist і execution allowlist для джерела даних обробляються окремо
  • workflow жорстко фіксований: пропустити profile/validate неможливо
  • profile-check виявляє дублікати, пропуски і невалідні значення рядків; якщо quality-gate не пройдено, пайплайн зупиняється до transform
  • transform крок робить детермінований дедуп (latest_by_event_ts), нормалізацію полів і cleaning rules з policy_hints
  • cleaning rules і dedupe strategy приходять із policy_hints (policy-configurable analysis)
  • analyze крок рахує метрики (gross/net revenue, conversion, failure rate, refund rate, latency)
  • validate крок перевіряє інваріанти якості і консистентність агрегатів
  • trace/history дають аудит від плану до фіналізації

Архітектура

  1. agent.py формує план кроків аналізу.
  2. gateway.py валідовує план і policy-рішення для ingest.
  3. tools.py виконує детерміновані кроки: ingest/profile/transform/analyze/validate.
  4. main.py оркеструє workflow, виконує finalize як host-крок, збирає aggregate, trace/history, повертає фінальний brief.

Структура проєкту

TEXT
agent-patterns/
└── data-analysis-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/data-analysis-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv.


Задача

Продакшен-кейс:

"Підготуй customer-safe аналітичний brief за останні 7 днів по US: revenue, conversion, failure rate, data-quality checks."


Код

context.py — request envelope + sample dataset

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, source: str) -> dict[str, Any]:
    rows = [
        {
            "order_id": "o-1001",
            "event_ts": "2026-03-01T10:00:00Z",
            "status": "paid",
            "amount": 120.0,
            "channel": "paid_search",
            "latency_ms": 180,
        },
        {
            "order_id": "o-1002",
            "event_ts": "2026-03-01T10:05:00Z",
            "status": "paid",
            "amount": 90.0,
            "channel": "email",
            "latency_ms": 160,
        },
        {
            "order_id": "o-1003",
            "event_ts": "2026-03-01T10:10:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "paid_search",
            "latency_ms": 240,
        },
        {
            "order_id": "o-1004",
            "event_ts": "2026-03-01T10:15:00Z",
            "status": "refunded",
            "amount": 40.0,
            "channel": "email",
            "latency_ms": 210,
        },
        {
            "order_id": "o-1005",
            "event_ts": "2026-03-01T10:20:00Z",
            "status": "paid",
            "amount": 140.0,
            "channel": "paid_search",
            "latency_ms": 170,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:25:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": None,
            "latency_ms": 190,
        },
        {
            "order_id": "o-1007",
            "event_ts": "2026-03-01T10:30:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "organic",
            "latency_ms": 260,
        },
        {
            "order_id": "o-1008",
            "event_ts": "2026-03-01T10:35:00Z",
            "status": "paid",
            "amount": 80.0,
            "channel": "paid_social",
            "latency_ms": 200,
        },
        {
            "order_id": "o-1009",
            "event_ts": "2026-03-01T10:40:00Z",
            "status": "refunded",
            "amount": 30.0,
            "channel": "paid_social",
            "latency_ms": 230,
        },
        {
            "order_id": "o-1010",
            "event_ts": "2026-03-01T10:45:00Z",
            "status": "paid",
            "amount": 70.0,
            "channel": "paid_search",
            "latency_ms": 175,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:50:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": "email",
            "latency_ms": 188,
        },
        {
            "order_id": "o-1011",
            "event_ts": "2026-03-01T10:55:00Z",
            "status": "paid",
            "amount": 60.0,
            "channel": None,
            "latency_ms": 195,
        },
    ]

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "source": source,
            "rows": rows,
        },
        "policy_hints": {
            "allowed_sources": ["warehouse_sales_daily", "warehouse_refunds_daily"],
            "allowed_regions": ["US", "CA"],
            "max_rows": 5000,
            "max_missing_channel_pct": 0.25,
            "analysis_rules": {
                "dedupe_key": "order_id",
                "dedupe_ts_key": "event_ts",
                "dedupe_strategy": "latest_by_event_ts",
                "fill_missing_channel": "unknown",
                "normalize_status": "lower_strip",
            },
        },
    }

tools.py — deterministic analysis steps

PYTHON
from __future__ import annotations

import math
from datetime import datetime
from collections import defaultdict
from typing import Any

ALLOWED_STATUSES = {"paid", "failed", "refunded"}


def _safe_float(value: Any) -> tuple[bool, float]:
    try:
        v = float(value)
    except (TypeError, ValueError):
        return False, 0.0
    if not math.isfinite(v):
        return False, 0.0
    return True, v


def _is_valid_event_ts(value: Any) -> bool:
    if not isinstance(value, str):
        return False
    ts = value.strip()
    if len(ts) != 20 or not ts.endswith("Z"):
        return False
    try:
        datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
    except ValueError:
        return False
    return True


def read_sales_snapshot(*, source: str, region: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
    return {
        "status": "ok",
        "data": {
            "source": source,
            "region": region.upper(),
            "rows": rows,
            "snapshot_at": "2026-03-07T10:00:00Z",
        },
    }


def profile_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    required_fields = {"order_id", "event_ts", "status", "amount", "channel", "latency_ms"}
    schema_ok = True
    missing_fields_count = 0

    missing_channel_count = 0
    duplicate_rows = 0
    by_order_id: dict[str, int] = {}

    invalid_order_id_count = 0
    invalid_event_ts_count = 0
    invalid_status_count = 0
    invalid_amount_count = 0
    invalid_latency_count = 0

    for row in rows:
        keys = set(row.keys())
        if not required_fields.issubset(keys):
            schema_ok = False
            missing_fields_count += 1

        order_id = row.get("order_id")
        order_id_norm = str(order_id).strip() if isinstance(order_id, str) else ""
        if not order_id_norm:
            invalid_order_id_count += 1
        else:
            by_order_id[order_id_norm] = by_order_id.get(order_id_norm, 0) + 1
            if by_order_id[order_id_norm] > 1:
                duplicate_rows += 1

        event_ts = row.get("event_ts")
        if not _is_valid_event_ts(event_ts):
            invalid_event_ts_count += 1

        status = str(row.get("status", "")).strip().lower()
        if status not in ALLOWED_STATUSES:
            invalid_status_count += 1

        ok_amount, amount = _safe_float(row.get("amount"))
        if not ok_amount or amount < 0.0:
            invalid_amount_count += 1

        ok_latency, latency = _safe_float(row.get("latency_ms"))
        if not ok_latency or latency < 0.0:
            invalid_latency_count += 1

        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            missing_channel_count += 1

    total = len(rows)
    missing_channel_pct = (missing_channel_count / total) if total else 0.0

    row_validity_ok = (
        invalid_order_id_count == 0
        and invalid_event_ts_count == 0
        and invalid_status_count == 0
        and invalid_amount_count == 0
        and invalid_latency_count == 0
    )

    return {
        "status": "ok",
        "data": {
            "row_count_raw": total,
            "duplicate_rows": duplicate_rows,
            "missing_channel_count": missing_channel_count,
            "missing_channel_pct": missing_channel_pct,
            "schema_ok": schema_ok,
            "missing_fields_count": missing_fields_count,
            "row_validity_ok": row_validity_ok,
            "invalid_order_id_count": invalid_order_id_count,
            "invalid_event_ts_count": invalid_event_ts_count,
            "invalid_status_count": invalid_status_count,
            "invalid_amount_count": invalid_amount_count,
            "invalid_latency_count": invalid_latency_count,
        },
    }


def transform_sales_rows(
    *,
    rows: list[dict[str, Any]],
    dedupe_key: str,
    dedupe_ts_key: str,
    fill_missing_channel: str,
    normalize_status: str,
) -> dict[str, Any]:
    latest_by_key: dict[str, dict[str, Any]] = {}
    ts_by_key: dict[str, str] = {}
    skipped_rows_missing_key = 0
    skipped_rows_invalid_ts = 0

    for row in rows:
        key = str(row.get(dedupe_key, "")).strip()
        if not key:
            skipped_rows_missing_key += 1
            continue

        event_ts_raw = row.get(dedupe_ts_key)
        if not _is_valid_event_ts(event_ts_raw):
            skipped_rows_invalid_ts += 1
            continue
        event_ts = str(event_ts_raw).strip()
        previous_ts = ts_by_key.get(key, "")
        if key not in latest_by_key or event_ts >= previous_ts:
            latest_by_key[key] = dict(row)
            ts_by_key[key] = event_ts

    cleaned_rows = [latest_by_key[k] for k in sorted(latest_by_key.keys())]

    filled_missing_channel = 0
    for row in cleaned_rows:
        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            row["channel"] = fill_missing_channel
            filled_missing_channel += 1

        if normalize_status == "lower_strip":
            row["status"] = str(row.get("status", "")).strip().lower()

        row[dedupe_key] = str(row.get(dedupe_key, "")).strip()
        row[dedupe_ts_key] = str(row.get(dedupe_ts_key, "")).strip()
        ok_amount, amount = _safe_float(row.get("amount"))
        ok_latency, latency = _safe_float(row.get("latency_ms"))
        row["amount"] = amount if ok_amount else 0.0
        row["latency_ms"] = latency if ok_latency else 0.0

    return {
        "status": "ok",
        "data": {
            "rows": cleaned_rows,
            "row_count_clean": len(cleaned_rows),
            "deduped_rows": max(0, len(rows) - len(cleaned_rows)),
            "filled_missing_channel": filled_missing_channel,
            "skipped_rows_missing_key": skipped_rows_missing_key,
            "skipped_rows_invalid_ts": skipped_rows_invalid_ts,
            "dedupe_strategy": "latest_by_event_ts",
            "dedupe_key": dedupe_key,
            "dedupe_ts_key": dedupe_ts_key,
        },
    }


def analyze_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    total = len(rows)
    paid_count = 0
    failed_count = 0
    refunded_count = 0

    gross_revenue = 0.0
    refund_amount_total = 0.0
    latency_values: list[float] = []
    channel_net_revenue: dict[str, float] = defaultdict(float)

    for row in rows:
        status = str(row.get("status", "")).lower()
        amount = float(row.get("amount", 0.0))
        channel = str(row.get("channel", "unknown")).strip().lower() or "unknown"
        latency = float(row.get("latency_ms", 0.0))
        latency_values.append(latency)

        if status == "paid":
            paid_count += 1
            gross_revenue += amount
            channel_net_revenue[channel] += amount
        elif status == "failed":
            failed_count += 1
        elif status == "refunded":
            refunded_count += 1
            refund_amount_total += amount
            channel_net_revenue[channel] -= amount

    net_revenue = gross_revenue - refund_amount_total
    conversion_rate = (paid_count / total) if total else 0.0
    failed_payment_rate = (failed_count / total) if total else 0.0
    refund_rate = (refunded_count / total) if total else 0.0

    avg_latency_ms = (sum(latency_values) / len(latency_values)) if latency_values else 0.0
    sorted_latency = sorted(latency_values)
    if sorted_latency:
        p95_idx = int((len(sorted_latency) - 1) * 0.95)
        p95_latency_ms = sorted_latency[p95_idx]
    else:
        p95_latency_ms = 0.0

    top_channel = "unknown"
    top_channel_value = float("-inf")
    for channel, value in channel_net_revenue.items():
        if value > top_channel_value:
            top_channel = channel
            top_channel_value = value

    eta_minutes = 45 if failed_payment_rate >= 0.15 else 20

    return {
        "status": "ok",
        "data": {
            "sample_size": total,
            "paid_count": paid_count,
            "failed_count": failed_count,
            "refunded_count": refunded_count,
            "gross_revenue": gross_revenue,
            "refund_amount_total": refund_amount_total,
            "net_revenue": net_revenue,
            "conversion_rate": conversion_rate,
            "failed_payment_rate": failed_payment_rate,
            "refund_rate": refund_rate,
            "avg_latency_ms": avg_latency_ms,
            "p95_latency_ms": p95_latency_ms,
            "top_channel": top_channel,
            "eta_minutes": eta_minutes,
        },
    }


def validate_analysis(
    *,
    metrics: dict[str, Any],
    profile: dict[str, Any],
    max_missing_channel_pct: float,
) -> dict[str, Any]:
    passed_checks: list[str] = []
    failed_checks: list[str] = []

    conversion_rate = float(metrics.get("conversion_rate", 0.0))
    failed_payment_rate = float(metrics.get("failed_payment_rate", 0.0))
    refund_rate = float(metrics.get("refund_rate", 0.0))

    gross_revenue = float(metrics.get("gross_revenue", 0.0))
    refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
    net_revenue = float(metrics.get("net_revenue", 0.0))

    sample_size = int(metrics.get("sample_size", 0))
    p95_latency_ms = float(metrics.get("p95_latency_ms", 0.0))
    missing_channel_pct = float(profile.get("missing_channel_pct", 0.0))
    row_validity_ok = bool(profile.get("row_validity_ok"))

    if 0.0 <= conversion_rate <= 1.0:
        passed_checks.append("conversion_between_0_1")
    else:
        failed_checks.append("conversion_out_of_range")

    if 0.0 <= refund_rate <= 1.0:
        passed_checks.append("refund_rate_between_0_1")
    else:
        failed_checks.append("refund_rate_out_of_range")

    if gross_revenue >= 0.0:
        passed_checks.append("gross_revenue_non_negative")
    else:
        failed_checks.append("gross_revenue_negative")

    if refund_amount_total >= 0.0:
        passed_checks.append("refund_amount_non_negative")
    else:
        failed_checks.append("refund_amount_negative")

    if net_revenue >= 0.0:
        passed_checks.append("net_revenue_non_negative")
    else:
        failed_checks.append("net_revenue_negative")

    if failed_payment_rate <= 0.35:
        passed_checks.append("failed_rate_reasonable")
    else:
        failed_checks.append("failed_rate_too_high")

    if sample_size >= 10:
        passed_checks.append("sample_size_min_10")
    else:
        failed_checks.append("sample_size_too_low")

    if p95_latency_ms <= 400.0:
        passed_checks.append("p95_latency_under_400")
    else:
        failed_checks.append("p95_latency_too_high")

    if missing_channel_pct <= max_missing_channel_pct:
        passed_checks.append("missing_channel_within_policy")
    else:
        failed_checks.append("missing_channel_policy_violation")

    if gross_revenue >= net_revenue:
        passed_checks.append("gross_ge_net")
    else:
        failed_checks.append("gross_less_than_net")

    if abs((gross_revenue - refund_amount_total) - net_revenue) <= 0.01:
        passed_checks.append("gross_minus_refunds_equals_net")
    else:
        failed_checks.append("gross_refund_net_mismatch")

    if row_validity_ok:
        passed_checks.append("profile_row_values_valid")
    else:
        failed_checks.append("profile_row_values_invalid")

    return {
        "status": "ok",
        "data": {
            "ok": len(failed_checks) == 0,
            "passed_checks": passed_checks,
            "failed_checks": failed_checks,
        },
    }

agent.py — plan proposal + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_analysis_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    req = request["request"]
    return {
        "steps": [
            {
                "id": "s1",
                "action": "ingest_sales",
                "args": {
                    "source": req["source"],
                    "region": req["region"],
                },
            },
            {"id": "s2", "action": "profile_sales", "args": {}},
            {"id": "s3", "action": "transform_sales", "args": {}},
            {"id": "s4", "action": "analyze_sales", "args": {}},
            {"id": "s5", "action": "validate_analysis", "args": {}},
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    quality: dict[str, Any],
) -> str:
    req = request["request"]
    m = aggregate["metrics"]
    checks = quality.get("passed_checks", [])

    return (
        f"Data analysis brief ({req['region']}, {req['report_date']}): source {req['source']} shows net revenue "
        f"{m['net_revenue']:.2f} (gross {m['gross_revenue']:.2f}, refunds {m['refund_amount_total']:.2f} subtracted), "
        f"conversion {m['conversion_rate_pct']:.2f}%, failed payment rate {m['failed_payment_rate_pct']:.2f}%, "
        f"refund rate {m['refund_rate_pct']:.2f}%, top channel {m['top_channel']}, "
        f"and p95 latency {m['p95_latency_ms']:.2f} ms. Quality checks passed: {len(checks)}."
    )

gateway.py — plan/policy/quality boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_rows: int = 5000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "ingest_sales",
    "profile_sales",
    "transform_sales",
    "analyze_sales",
    "validate_analysis",
]


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    steps: list[dict[str, Any]] = []
    actions: list[str] = []
    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        steps.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return steps


class DataAnalysisGateway:
    def __init__(
        self,
        *,
        allowed_sources_policy: set[str],
        allowed_sources_execution: set[str],
        allowed_regions_policy: set[str],
        budget: Budget,
    ):
        self.allowed_sources_policy = set(allowed_sources_policy)
        self.allowed_sources_execution = set(allowed_sources_execution)
        self.allowed_regions_policy = {x.upper() for x in allowed_regions_policy}
        self.budget = budget

    def evaluate_ingest(self, *, source: str, region: str) -> Decision:
        if source not in self.allowed_sources_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if region.upper() not in self.allowed_regions_policy:
            return Decision(kind="deny", reason="region_denied_policy")
        if source not in self.allowed_sources_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def ensure_rows_budget(self, *, rows: list[dict[str, Any]]) -> None:
        if len(rows) > self.budget.max_rows:
            raise StopRun("dataset_too_large")

    def validate_profile(self, *, profile: dict[str, Any], max_missing_channel_pct: float) -> None:
        if not bool(profile.get("schema_ok")):
            raise StopRun("invalid_profile:schema")
        if not bool(profile.get("row_validity_ok")):
            raise StopRun("invalid_profile:row_values")

        missing_pct = float(profile.get("missing_channel_pct", 0.0))
        if missing_pct > max_missing_channel_pct:
            raise StopRun("invalid_profile:missing_channel_too_high")

    def validate_metrics(self, *, metrics: dict[str, Any]) -> None:
        sample_size = int(metrics.get("sample_size", 0))
        if sample_size <= 0:
            raise StopRun("invalid_metrics:sample_size")

        conversion = float(metrics.get("conversion_rate", 0.0))
        if not (0.0 <= conversion <= 1.0):
            raise StopRun("invalid_metrics:conversion_rate")

        failed_rate = float(metrics.get("failed_payment_rate", 0.0))
        if not (0.0 <= failed_rate <= 1.0):
            raise StopRun("invalid_metrics:failed_payment_rate")

        refund_rate = float(metrics.get("refund_rate", 0.0))
        if not (0.0 <= refund_rate <= 1.0):
            raise StopRun("invalid_metrics:refund_rate")

        gross_revenue = float(metrics.get("gross_revenue", 0.0))
        refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
        net_revenue = float(metrics.get("net_revenue", 0.0))

        if net_revenue < 0:
            raise StopRun("invalid_metrics:net_revenue")
        if gross_revenue < net_revenue:
            raise StopRun("invalid_metrics:gross_less_than_net")
        if abs((gross_revenue - refund_amount_total) - net_revenue) > 0.01:
            raise StopRun("invalid_metrics:revenue_consistency")

main.py — orchestrate analysis workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_analysis_plan
from context import build_request
from gateway import Budget, DataAnalysisGateway, StopRun, validate_plan
from tools import (
    analyze_sales_rows,
    profile_sales_rows,
    read_sales_snapshot,
    transform_sales_rows,
    validate_analysis,
)

GOAL = (
    "Prepare a validated weekly data-analysis brief for US payments with revenue, "
    "conversion, failed payment rate, and quality checks."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    source="warehouse_sales_daily",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_rows=5000,
)

DEFAULT_ALLOWED_SOURCES_POLICY = {"warehouse_sales_daily", "warehouse_refunds_daily"}
ALLOWED_SOURCES_EXECUTION = {"warehouse_sales_daily"}
DEFAULT_ALLOWED_REGIONS_POLICY = {"US", "CA"}


DEFAULT_ANALYSIS_RULES = {
    "dedupe_key": "order_id",
    "dedupe_ts_key": "event_ts",
    "dedupe_strategy": "latest_by_event_ts",
    "fill_missing_channel": "unknown",
    "normalize_status": "lower_strip",
}


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def run_data_analysis_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    artifact_ids = {
        "snapshot_id": f"snap_{run_id[:8]}",
        "profile_id": f"profile_{run_id[:8]}",
        "transform_id": f"transform_{run_id[:8]}",
        "metrics_id": f"metrics_{run_id[:8]}",
        "quality_id": f"quality_{run_id[:8]}",
    }

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_sources_raw = hints.get("allowed_sources")
    if isinstance(allowed_sources_raw, list):
        allowed_sources_policy = {
            str(item).strip()
            for item in allowed_sources_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
    if not allowed_sources_policy:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)

    allowed_regions_raw = hints.get("allowed_regions")
    if isinstance(allowed_regions_raw, list):
        allowed_regions_policy = {
            str(item).strip().upper()
            for item in allowed_regions_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
    if not allowed_regions_policy:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)

    max_rows_raw = hints.get("max_rows", DEFAULT_BUDGET.max_rows)
    max_missing_channel_pct_raw = hints.get("max_missing_channel_pct", 0.25)

    analysis_rules_raw = hints.get("analysis_rules")
    analysis_rules = dict(DEFAULT_ANALYSIS_RULES)
    if isinstance(analysis_rules_raw, dict):
        for key, value in analysis_rules_raw.items():
            if isinstance(key, str):
                analysis_rules[key] = value

    dedupe_key = str(analysis_rules.get("dedupe_key", DEFAULT_ANALYSIS_RULES["dedupe_key"])).strip() or "order_id"
    dedupe_ts_key = str(analysis_rules.get("dedupe_ts_key", DEFAULT_ANALYSIS_RULES["dedupe_ts_key"])).strip() or "event_ts"
    dedupe_strategy = str(
        analysis_rules.get("dedupe_strategy", DEFAULT_ANALYSIS_RULES["dedupe_strategy"])
    ).strip() or "latest_by_event_ts"
    fill_missing_channel = str(
        analysis_rules.get("fill_missing_channel", DEFAULT_ANALYSIS_RULES["fill_missing_channel"])
    ).strip() or "unknown"
    normalize_status = str(
        analysis_rules.get("normalize_status", DEFAULT_ANALYSIS_RULES["normalize_status"])
    ).strip() or "lower_strip"

    if dedupe_strategy != "latest_by_event_ts":
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:dedupe_strategy",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_rows = int(max_rows_raw)
    except (TypeError, ValueError):
        max_rows = DEFAULT_BUDGET.max_rows
    if max_rows <= 0:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_rows",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_missing_channel_pct = float(max_missing_channel_pct_raw)
    except (TypeError, ValueError):
        max_missing_channel_pct = 0.25
    if not (0.0 <= max_missing_channel_pct <= 1.0):
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_missing_channel_pct",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_rows=max(10, min(200000, max_rows)),
    )

    gateway = DataAnalysisGateway(
        allowed_sources_policy=allowed_sources_policy,
        allowed_sources_execution=ALLOWED_SOURCES_EXECUTION,
        allowed_regions_policy=allowed_regions_policy,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_analysis_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_analysis_plan",
                "step_ids": [item["id"] for item in steps],
            }
        )

        phase = "policy_check"
        ingest_args = steps[0]["args"]
        source = str(ingest_args.get("source", "")).strip()
        region = str(ingest_args.get("region", "")).strip().upper()
        decision = gateway.evaluate_ingest(source=source, region=region)

        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "source": source,
                "region": region,
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_sources_policy": sorted(allowed_sources_policy),
                "allowed_sources_execution": sorted(ALLOWED_SOURCES_EXECUTION),
                "elapsed_ms": elapsed_ms(),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {"kind": decision.kind, "reason": decision.reason},
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="ingest_profile")

        phase = "ingest_profile"
        req = request["request"]
        snapshot = _unwrap_tool_data(
            read_sales_snapshot(
                source=source,
                region=region,
                rows=list(req["rows"]),
            ),
            tool_name="read_sales_snapshot",
        )
        rows_raw = list(snapshot["rows"])
        gateway.ensure_rows_budget(rows=rows_raw)

        profile = _unwrap_tool_data(
            profile_sales_rows(rows=rows_raw),
            tool_name="profile_sales_rows",
        )
        gateway.validate_profile(profile=profile, max_missing_channel_pct=max_missing_channel_pct)

        trace.append(
            {
                "step": 3,
                "phase": "ingest_profile",
                "row_count_raw": profile["row_count_raw"],
                "duplicate_rows": profile["duplicate_rows"],
                "missing_channel_pct": round(float(profile["missing_channel_pct"]) * 100, 2),
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "ingest_profile",
                "source": source,
                "row_count_raw": profile["row_count_raw"],
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="transform_analyze")

        phase = "transform_analyze"
        transformed = _unwrap_tool_data(
            transform_sales_rows(
                rows=rows_raw,
                dedupe_key=dedupe_key,
                dedupe_ts_key=dedupe_ts_key,
                fill_missing_channel=fill_missing_channel,
                normalize_status=normalize_status,
            ),
            tool_name="transform_sales_rows",
        )
        rows_clean = list(transformed["rows"])

        metrics = _unwrap_tool_data(
            analyze_sales_rows(rows=rows_clean),
            tool_name="analyze_sales_rows",
        )
        gateway.validate_metrics(metrics=metrics)

        trace.append(
            {
                "step": 4,
                "phase": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "deduped_rows": transformed["deduped_rows"],
                "filled_missing_channel": transformed["filled_missing_channel"],
                "skipped_rows_missing_key": transformed.get("skipped_rows_missing_key", 0),
                "skipped_rows_invalid_ts": transformed.get("skipped_rows_invalid_ts", 0),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "applied_rules": {
                    "dedupe_strategy": dedupe_strategy,
                    "dedupe_key": dedupe_key,
                    "dedupe_ts_key": dedupe_ts_key,
                    "fill_missing_channel": fill_missing_channel,
                    "normalize_status": normalize_status,
                },
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="validate")

        phase = "validate"
        quality = _unwrap_tool_data(
            validate_analysis(
                metrics=metrics,
                profile=profile,
                max_missing_channel_pct=max_missing_channel_pct,
            ),
            tool_name="validate_analysis",
        )
        if not bool(quality.get("ok")):
            failed = quality.get("failed_checks") or []
            first = str(failed[0]) if failed else "unknown"
            return stopped(f"validation_failed:{first}", phase=phase, quality=quality)

        trace.append(
            {
                "step": 5,
                "phase": "validate",
                "passed_checks": len(quality.get("passed_checks", [])),
                "failed_checks": len(quality.get("failed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "validate_analysis",
                "passed_checks": len(quality.get("passed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "source": source,
            "metrics": {
                "sample_size": int(metrics["sample_size"]),
                "gross_revenue": round(float(metrics["gross_revenue"]), 2),
                "refund_amount_total": round(float(metrics["refund_amount_total"]), 2),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate": round(float(metrics["conversion_rate"]), 6),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate": round(float(metrics["failed_payment_rate"]), 6),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "refund_rate": round(float(metrics["refund_rate"]), 6),
                "refund_rate_pct": round(float(metrics["refund_rate"]) * 100, 2),
                "top_channel": str(metrics["top_channel"]),
                "avg_latency_ms": round(float(metrics["avg_latency_ms"]), 2),
                "p95_latency_ms": round(float(metrics["p95_latency_ms"]), 2),
                "eta_minutes": int(metrics["eta_minutes"]),
            },
            "data_quality": {
                "row_count_raw": int(profile["row_count_raw"]),
                "row_count_clean": int(transformed["row_count_clean"]),
                "deduped_rows": int(transformed["deduped_rows"]),
                "missing_channel_pct_raw": round(float(profile["missing_channel_pct"]) * 100, 2),
                "row_validity_ok": bool(profile["row_validity_ok"]),
                "invalid_order_id_count": int(profile["invalid_order_id_count"]),
                "invalid_event_ts_count": int(profile["invalid_event_ts_count"]),
                "invalid_status_count": int(profile["invalid_status_count"]),
                "invalid_amount_count": int(profile["invalid_amount_count"]),
                "invalid_latency_count": int(profile["invalid_latency_count"]),
                "skipped_rows_missing_key": int(transformed.get("skipped_rows_missing_key", 0)),
                "skipped_rows_invalid_ts": int(transformed.get("skipped_rows_invalid_ts", 0)),
            },
        }

        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            quality=quality,
        )

        trace.append(
            {
                "step": 6,
                "phase": "finalize",
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "validated_analysis",
            "answer": answer,
            "aggregate": aggregate,
            "quality": quality,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_data_analysis_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • workflow примусово фіксований, тому агент не може "перестрибнути" profile/validate
  • policy boundary окремо вирішує, чи можна брати джерело в цьому run
  • профілювання + валідація дають технічний quality-gate, а не "віру в число"; при invalid_profile:* run зупиняється до transform
  • дедуп детермінований (latest_by_event_ts) для strict UTC event_ts (YYYY-MM-DDTHH:MM:SSZ), а cleaning rules явно приходять із policy_hints
  • deduped_rows у цьому демо показує сумарно видалені рядки після transform (дедуп + можливий пропуск невалідних рядків); у поточному sample пропусків немає
  • filled_missing_channel рахується після дедупу: якщо дубль замінений новішим рядком із валідним channel, заповнення не потрібне (саме тому тут 1, а не 2)
  • finalize тут не tool-крок, а orchestration-крок у main.py
  • фінальний brief формується тільки з validated metrics
  • trace/history з elapsed_ms і artifact_refs роблять пайплайн аудитабельним

Приклад виводу

JSON
{
  "run_id": "3045a7d0-1431-455b-9d26-16ae9fbc653a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "validated_analysis",
  "answer": "Data analysis brief (US, 2026-03-07): source warehouse_sales_daily shows net revenue 600.00 (gross 670.00, refunds 70.00 subtracted), conversion 63.64%, failed payment rate 18.18%, refund rate 18.18%, top channel paid_search, and p95 latency 240.00 ms. Quality checks passed: 12.",
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "source": "warehouse_sales_daily",
    "metrics": {
      "sample_size": 11,
      "gross_revenue": 670.0,
      "refund_amount_total": 70.0,
      "net_revenue": 600.0,
      "conversion_rate": 0.636364,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate": 0.181818,
      "failed_payment_rate_pct": 18.18,
      "refund_rate": 0.181818,
      "refund_rate_pct": 18.18,
      "top_channel": "paid_search",
      "avg_latency_ms": 200.73,
      "p95_latency_ms": 240.0,
      "eta_minutes": 45
    },
    "data_quality": {
      "row_count_raw": 12,
      "row_count_clean": 11,
      "deduped_rows": 1,
      "missing_channel_pct_raw": 16.67,
      "row_validity_ok": true,
      "invalid_order_id_count": 0,
      "invalid_event_ts_count": 0,
      "invalid_status_count": 0,
      "invalid_amount_count": 0,
      "invalid_latency_count": 0,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0
    }
  },
  "quality": {
    "ok": true,
    "passed_checks": [
      "conversion_between_0_1",
      "refund_rate_between_0_1",
      "gross_revenue_non_negative",
      "refund_amount_non_negative",
      "net_revenue_non_negative",
      "failed_rate_reasonable",
      "sample_size_min_10",
      "p95_latency_under_400",
      "missing_channel_within_policy",
      "gross_ge_net",
      "gross_minus_refunds_equals_net",
      "profile_row_values_valid"
    ],
    "failed_checks": []
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "source": "warehouse_sales_daily",
      "region": "US",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_sources_policy": [
        "warehouse_refunds_daily",
        "warehouse_sales_daily"
      ],
      "allowed_sources_execution": [
        "warehouse_sales_daily"
      ],
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "ingest_profile",
      "row_count_raw": 12,
      "duplicate_rows": 1,
      "missing_channel_pct": 16.67,
      "artifact_refs": {
        "snapshot_id": "snap_3045a7d0",
        "profile_id": "profile_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 4,
      "phase": "transform_analyze",
      "row_count_clean": 11,
      "deduped_rows": 1,
      "filled_missing_channel": 1,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0,
      "net_revenue": 600.0,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate_pct": 18.18,
      "artifact_refs": {
        "transform_id": "transform_3045a7d0",
        "metrics_id": "metrics_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 5,
      "phase": "validate",
      "passed_checks": 12,
      "failed_checks": 0,
      "artifact_refs": {
        "quality_id": "quality_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 6,
      "phase": "finalize",
      "elapsed_ms": 2,
      "ok": true
    }
  ],
  "history": [{...}]
}

Типові stop_reason

  • success — run завершено коректно
  • max_seconds — вичерпано загальний time budget
  • invalid_plan:* — невалідний план аналізу
  • invalid_plan:step_sequence — порушено обов'язковий порядок workflow (з details expected/received)
  • invalid_step:* — невалідний step-контракт
  • policy_block:source_denied_policy — джерело не дозволене policy allowlist
  • policy_block:source_denied_execution — джерело не дозволене execution allowlist
  • policy_block:region_denied_policy — регіон не дозволений policy
  • dataset_too_large — перевищено ліміт рядків
  • invalid_profile:schema — profile виявив schema mismatch
  • invalid_profile:row_values — profile виявив невалідні значення рядків
  • invalid_profile:missing_channel_too_high — частка пропусків каналу перевищила policy threshold
  • invalid_metrics:* — metrics не пройшли базові інваріанти
  • validation_failed:* — quality validation не пройшла
  • tool_invalid_output:* — крок повернув невалідний контракт
  • invalid_request:dedupe_strategy — не підтримувана dedupe strategy у analysis_rules
  • invalid_request:max_rows — невалідне значення max_rows
  • invalid_request:max_missing_channel_pct — невалідний threshold max_missing_channel_pct

Що тут НЕ показано

  • підключення до реального DWH/BI замість in-memory snapshot
  • версіонування датасетів (snapshot lineage)
  • SQL pushdown і cost-aware query planning
  • multi-source joins з конфліктами schema
  • anomaly detection beyond простих інваріантів
  • persisted artifacts (таблиці/графіки) у сховище

Що спробувати далі

  1. Поставити source="warehouse_refunds_daily" і подивитись policy_block:source_denied_execution.
  2. Поставити max_missing_channel_pct=0.1 і подивитись invalid_profile:missing_channel_too_high.
  3. Додати в план step не з EXPECTED_ACTION_SEQUENCE і подивитись invalid_plan:step_sequence.
  4. Збільшити rows понад max_rows і подивитись dataset_too_large.
  5. Повернути в analyze кроці conversion_rate=1.2 або gross < net і подивитись invalid_metrics:*.
⏱️ 19 хв читанняОновлено 6 березня 2026 р.Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.

Автор

Микола — інженер, який будує інфраструктуру для продакшн AI-агентів.

Фокус: патерни агентів, режими відмов, контроль рантайму та надійність систем.

🔗 GitHub: https://github.com/mykolademyanov


Редакційна примітка

Ця документація підготовлена з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Контент базується на реальних відмовах, постмортемах та операційних інцидентах у розгорнутих AI-агентних системах.