Agent Data-Analysis en Python : Exemple complet

Exemple runnable style production d un agent Data-Analysis en Python avec workflow fixe, policy boundary, validation des metriques et trace/history transparents.
Sur cette page
  1. Essence du pattern (bref)
  2. Ce Que Cet Exemple Demontre
  3. Architecture
  4. Structure du Projet
  5. Comment Executer
  6. Tache
  7. Code
  8. context.py — request envelope + sample dataset
  9. tools.py — deterministic analysis steps
  10. agent.py — plan proposal + final answer
  11. gateway.py — plan/policy/quality boundaries
  12. main.py — orchestrate analysis workflow
  13. Exemple de Sortie
  14. stop_reason Typiques
  15. Ce Qui N est PAS Montre Ici
  16. Ce Qu il Faut Essayer Ensuite

Essence du pattern (bref)

Data-Analysis Agent signifie : l agent n "invente" pas les metriques, il suit un workflow fixe :

  • ingest
  • profile
  • transform
  • analyze
  • validate
  • finalize (etape host d orchestration)

Le resultat depend donc d un pipeline controle, et non d un raisonnement textuel libre.


Ce Que Cet Exemple Demontre

  • L agent propose un plan d analyse, mais runtime verifie le contrat du plan et l ordre des etapes
  • policy allowlist et execution allowlist pour la source de donnees sont traitees separement
  • le workflow est strictement fixe : impossible de sauter profile/validate
  • profile-check detecte doublons, manquants et valeurs de lignes invalides ; si le quality-gate echoue, le pipeline s arrete avant transform
  • l etape transform applique un dedup deterministe (latest_by_event_ts), la normalisation des champs et les cleaning rules depuis policy_hints
  • cleaning rules et dedupe strategy viennent de policy_hints (analyse configurable par policy)
  • l etape analyze calcule les metriques (gross/net revenue, conversion, failure rate, refund rate, latency)
  • l etape validate verifie les invariants de qualite et la coherence des agregats
  • trace/history donnent une auditabilite du plan a la finalisation

Architecture

  1. agent.py construit le plan des etapes d analyse.
  2. gateway.py valide le plan et la decision policy pour ingest.
  3. tools.py execute des etapes deterministes : ingest/profile/transform/analyze/validate.
  4. main.py orchestre le workflow, execute finalize comme etape host, collecte aggregate, trace/history, et renvoie le brief final.

Structure du Projet

TEXT
agent-patterns/
└── data-analysis-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Comment Executer

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/data-analysis-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Python 3.11+ est requis.

Variante via export :

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante via .env (optionnel)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

C est une variante shell (macOS/Linux). Sous Windows, il est plus simple d utiliser des variables set ou, si souhaite, python-dotenv.


Tache

Cas production :

"Prepare un brief analytique customer-safe sur les 7 derniers jours pour US : revenue, conversion, failure rate, data-quality checks."


Code

context.py — request envelope + sample dataset

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, source: str) -> dict[str, Any]:
    rows = [
        {
            "order_id": "o-1001",
            "event_ts": "2026-03-01T10:00:00Z",
            "status": "paid",
            "amount": 120.0,
            "channel": "paid_search",
            "latency_ms": 180,
        },
        {
            "order_id": "o-1002",
            "event_ts": "2026-03-01T10:05:00Z",
            "status": "paid",
            "amount": 90.0,
            "channel": "email",
            "latency_ms": 160,
        },
        {
            "order_id": "o-1003",
            "event_ts": "2026-03-01T10:10:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "paid_search",
            "latency_ms": 240,
        },
        {
            "order_id": "o-1004",
            "event_ts": "2026-03-01T10:15:00Z",
            "status": "refunded",
            "amount": 40.0,
            "channel": "email",
            "latency_ms": 210,
        },
        {
            "order_id": "o-1005",
            "event_ts": "2026-03-01T10:20:00Z",
            "status": "paid",
            "amount": 140.0,
            "channel": "paid_search",
            "latency_ms": 170,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:25:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": None,
            "latency_ms": 190,
        },
        {
            "order_id": "o-1007",
            "event_ts": "2026-03-01T10:30:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "organic",
            "latency_ms": 260,
        },
        {
            "order_id": "o-1008",
            "event_ts": "2026-03-01T10:35:00Z",
            "status": "paid",
            "amount": 80.0,
            "channel": "paid_social",
            "latency_ms": 200,
        },
        {
            "order_id": "o-1009",
            "event_ts": "2026-03-01T10:40:00Z",
            "status": "refunded",
            "amount": 30.0,
            "channel": "paid_social",
            "latency_ms": 230,
        },
        {
            "order_id": "o-1010",
            "event_ts": "2026-03-01T10:45:00Z",
            "status": "paid",
            "amount": 70.0,
            "channel": "paid_search",
            "latency_ms": 175,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:50:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": "email",
            "latency_ms": 188,
        },
        {
            "order_id": "o-1011",
            "event_ts": "2026-03-01T10:55:00Z",
            "status": "paid",
            "amount": 60.0,
            "channel": None,
            "latency_ms": 195,
        },
    ]

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "source": source,
            "rows": rows,
        },
        "policy_hints": {
            "allowed_sources": ["warehouse_sales_daily", "warehouse_refunds_daily"],
            "allowed_regions": ["US", "CA"],
            "max_rows": 5000,
            "max_missing_channel_pct": 0.25,
            "analysis_rules": {
                "dedupe_key": "order_id",
                "dedupe_ts_key": "event_ts",
                "dedupe_strategy": "latest_by_event_ts",
                "fill_missing_channel": "unknown",
                "normalize_status": "lower_strip",
            },
        },
    }

tools.py — deterministic analysis steps

PYTHON
from __future__ import annotations

import math
from datetime import datetime
from collections import defaultdict
from typing import Any

ALLOWED_STATUSES = {"paid", "failed", "refunded"}


def _safe_float(value: Any) -> tuple[bool, float]:
    try:
        v = float(value)
    except (TypeError, ValueError):
        return False, 0.0
    if not math.isfinite(v):
        return False, 0.0
    return True, v


def _is_valid_event_ts(value: Any) -> bool:
    if not isinstance(value, str):
        return False
    ts = value.strip()
    if len(ts) != 20 or not ts.endswith("Z"):
        return False
    try:
        datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
    except ValueError:
        return False
    return True


def read_sales_snapshot(*, source: str, region: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
    return {
        "status": "ok",
        "data": {
            "source": source,
            "region": region.upper(),
            "rows": rows,
            "snapshot_at": "2026-03-07T10:00:00Z",
        },
    }


def profile_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    required_fields = {"order_id", "event_ts", "status", "amount", "channel", "latency_ms"}
    schema_ok = True
    missing_fields_count = 0

    missing_channel_count = 0
    duplicate_rows = 0
    by_order_id: dict[str, int] = {}

    invalid_order_id_count = 0
    invalid_event_ts_count = 0
    invalid_status_count = 0
    invalid_amount_count = 0
    invalid_latency_count = 0

    for row in rows:
        keys = set(row.keys())
        if not required_fields.issubset(keys):
            schema_ok = False
            missing_fields_count += 1

        order_id = row.get("order_id")
        order_id_norm = str(order_id).strip() if isinstance(order_id, str) else ""
        if not order_id_norm:
            invalid_order_id_count += 1
        else:
            by_order_id[order_id_norm] = by_order_id.get(order_id_norm, 0) + 1
            if by_order_id[order_id_norm] > 1:
                duplicate_rows += 1

        event_ts = row.get("event_ts")
        if not _is_valid_event_ts(event_ts):
            invalid_event_ts_count += 1

        status = str(row.get("status", "")).strip().lower()
        if status not in ALLOWED_STATUSES:
            invalid_status_count += 1

        ok_amount, amount = _safe_float(row.get("amount"))
        if not ok_amount or amount < 0.0:
            invalid_amount_count += 1

        ok_latency, latency = _safe_float(row.get("latency_ms"))
        if not ok_latency or latency < 0.0:
            invalid_latency_count += 1

        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            missing_channel_count += 1

    total = len(rows)
    missing_channel_pct = (missing_channel_count / total) if total else 0.0

    row_validity_ok = (
        invalid_order_id_count == 0
        and invalid_event_ts_count == 0
        and invalid_status_count == 0
        and invalid_amount_count == 0
        and invalid_latency_count == 0
    )

    return {
        "status": "ok",
        "data": {
            "row_count_raw": total,
            "duplicate_rows": duplicate_rows,
            "missing_channel_count": missing_channel_count,
            "missing_channel_pct": missing_channel_pct,
            "schema_ok": schema_ok,
            "missing_fields_count": missing_fields_count,
            "row_validity_ok": row_validity_ok,
            "invalid_order_id_count": invalid_order_id_count,
            "invalid_event_ts_count": invalid_event_ts_count,
            "invalid_status_count": invalid_status_count,
            "invalid_amount_count": invalid_amount_count,
            "invalid_latency_count": invalid_latency_count,
        },
    }


def transform_sales_rows(
    *,
    rows: list[dict[str, Any]],
    dedupe_key: str,
    dedupe_ts_key: str,
    fill_missing_channel: str,
    normalize_status: str,
) -> dict[str, Any]:
    latest_by_key: dict[str, dict[str, Any]] = {}
    ts_by_key: dict[str, str] = {}
    skipped_rows_missing_key = 0
    skipped_rows_invalid_ts = 0

    for row in rows:
        key = str(row.get(dedupe_key, "")).strip()
        if not key:
            skipped_rows_missing_key += 1
            continue

        event_ts_raw = row.get(dedupe_ts_key)
        if not _is_valid_event_ts(event_ts_raw):
            skipped_rows_invalid_ts += 1
            continue
        event_ts = str(event_ts_raw).strip()
        previous_ts = ts_by_key.get(key, "")
        if key not in latest_by_key or event_ts >= previous_ts:
            latest_by_key[key] = dict(row)
            ts_by_key[key] = event_ts

    cleaned_rows = [latest_by_key[k] for k in sorted(latest_by_key.keys())]

    filled_missing_channel = 0
    for row in cleaned_rows:
        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            row["channel"] = fill_missing_channel
            filled_missing_channel += 1

        if normalize_status == "lower_strip":
            row["status"] = str(row.get("status", "")).strip().lower()

        row[dedupe_key] = str(row.get(dedupe_key, "")).strip()
        row[dedupe_ts_key] = str(row.get(dedupe_ts_key, "")).strip()
        ok_amount, amount = _safe_float(row.get("amount"))
        ok_latency, latency = _safe_float(row.get("latency_ms"))
        row["amount"] = amount if ok_amount else 0.0
        row["latency_ms"] = latency if ok_latency else 0.0

    return {
        "status": "ok",
        "data": {
            "rows": cleaned_rows,
            "row_count_clean": len(cleaned_rows),
            "deduped_rows": max(0, len(rows) - len(cleaned_rows)),
            "filled_missing_channel": filled_missing_channel,
            "skipped_rows_missing_key": skipped_rows_missing_key,
            "skipped_rows_invalid_ts": skipped_rows_invalid_ts,
            "dedupe_strategy": "latest_by_event_ts",
            "dedupe_key": dedupe_key,
            "dedupe_ts_key": dedupe_ts_key,
        },
    }


def analyze_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    total = len(rows)
    paid_count = 0
    failed_count = 0
    refunded_count = 0

    gross_revenue = 0.0
    refund_amount_total = 0.0
    latency_values: list[float] = []
    channel_net_revenue: dict[str, float] = defaultdict(float)

    for row in rows:
        status = str(row.get("status", "")).lower()
        amount = float(row.get("amount", 0.0))
        channel = str(row.get("channel", "unknown")).strip().lower() or "unknown"
        latency = float(row.get("latency_ms", 0.0))
        latency_values.append(latency)

        if status == "paid":
            paid_count += 1
            gross_revenue += amount
            channel_net_revenue[channel] += amount
        elif status == "failed":
            failed_count += 1
        elif status == "refunded":
            refunded_count += 1
            refund_amount_total += amount
            channel_net_revenue[channel] -= amount

    net_revenue = gross_revenue - refund_amount_total
    conversion_rate = (paid_count / total) if total else 0.0
    failed_payment_rate = (failed_count / total) if total else 0.0
    refund_rate = (refunded_count / total) if total else 0.0

    avg_latency_ms = (sum(latency_values) / len(latency_values)) if latency_values else 0.0
    sorted_latency = sorted(latency_values)
    if sorted_latency:
        p95_idx = int((len(sorted_latency) - 1) * 0.95)
        p95_latency_ms = sorted_latency[p95_idx]
    else:
        p95_latency_ms = 0.0

    top_channel = "unknown"
    top_channel_value = float("-inf")
    for channel, value in channel_net_revenue.items():
        if value > top_channel_value:
            top_channel = channel
            top_channel_value = value

    eta_minutes = 45 if failed_payment_rate >= 0.15 else 20

    return {
        "status": "ok",
        "data": {
            "sample_size": total,
            "paid_count": paid_count,
            "failed_count": failed_count,
            "refunded_count": refunded_count,
            "gross_revenue": gross_revenue,
            "refund_amount_total": refund_amount_total,
            "net_revenue": net_revenue,
            "conversion_rate": conversion_rate,
            "failed_payment_rate": failed_payment_rate,
            "refund_rate": refund_rate,
            "avg_latency_ms": avg_latency_ms,
            "p95_latency_ms": p95_latency_ms,
            "top_channel": top_channel,
            "eta_minutes": eta_minutes,
        },
    }


def validate_analysis(
    *,
    metrics: dict[str, Any],
    profile: dict[str, Any],
    max_missing_channel_pct: float,
) -> dict[str, Any]:
    passed_checks: list[str] = []
    failed_checks: list[str] = []

    conversion_rate = float(metrics.get("conversion_rate", 0.0))
    failed_payment_rate = float(metrics.get("failed_payment_rate", 0.0))
    refund_rate = float(metrics.get("refund_rate", 0.0))

    gross_revenue = float(metrics.get("gross_revenue", 0.0))
    refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
    net_revenue = float(metrics.get("net_revenue", 0.0))

    sample_size = int(metrics.get("sample_size", 0))
    p95_latency_ms = float(metrics.get("p95_latency_ms", 0.0))
    missing_channel_pct = float(profile.get("missing_channel_pct", 0.0))
    row_validity_ok = bool(profile.get("row_validity_ok"))

    if 0.0 <= conversion_rate <= 1.0:
        passed_checks.append("conversion_between_0_1")
    else:
        failed_checks.append("conversion_out_of_range")

    if 0.0 <= refund_rate <= 1.0:
        passed_checks.append("refund_rate_between_0_1")
    else:
        failed_checks.append("refund_rate_out_of_range")

    if gross_revenue >= 0.0:
        passed_checks.append("gross_revenue_non_negative")
    else:
        failed_checks.append("gross_revenue_negative")

    if refund_amount_total >= 0.0:
        passed_checks.append("refund_amount_non_negative")
    else:
        failed_checks.append("refund_amount_negative")

    if net_revenue >= 0.0:
        passed_checks.append("net_revenue_non_negative")
    else:
        failed_checks.append("net_revenue_negative")

    if failed_payment_rate <= 0.35:
        passed_checks.append("failed_rate_reasonable")
    else:
        failed_checks.append("failed_rate_too_high")

    if sample_size >= 10:
        passed_checks.append("sample_size_min_10")
    else:
        failed_checks.append("sample_size_too_low")

    if p95_latency_ms <= 400.0:
        passed_checks.append("p95_latency_under_400")
    else:
        failed_checks.append("p95_latency_too_high")

    if missing_channel_pct <= max_missing_channel_pct:
        passed_checks.append("missing_channel_within_policy")
    else:
        failed_checks.append("missing_channel_policy_violation")

    if gross_revenue >= net_revenue:
        passed_checks.append("gross_ge_net")
    else:
        failed_checks.append("gross_less_than_net")

    if abs((gross_revenue - refund_amount_total) - net_revenue) <= 0.01:
        passed_checks.append("gross_minus_refunds_equals_net")
    else:
        failed_checks.append("gross_refund_net_mismatch")

    if row_validity_ok:
        passed_checks.append("profile_row_values_valid")
    else:
        failed_checks.append("profile_row_values_invalid")

    return {
        "status": "ok",
        "data": {
            "ok": len(failed_checks) == 0,
            "passed_checks": passed_checks,
            "failed_checks": failed_checks,
        },
    }

agent.py — plan proposal + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_analysis_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    req = request["request"]
    return {
        "steps": [
            {
                "id": "s1",
                "action": "ingest_sales",
                "args": {
                    "source": req["source"],
                    "region": req["region"],
                },
            },
            {"id": "s2", "action": "profile_sales", "args": {}},
            {"id": "s3", "action": "transform_sales", "args": {}},
            {"id": "s4", "action": "analyze_sales", "args": {}},
            {"id": "s5", "action": "validate_analysis", "args": {}},
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    quality: dict[str, Any],
) -> str:
    req = request["request"]
    m = aggregate["metrics"]
    checks = quality.get("passed_checks", [])

    return (
        f"Data analysis brief ({req['region']}, {req['report_date']}): source {req['source']} shows net revenue "
        f"{m['net_revenue']:.2f} (gross {m['gross_revenue']:.2f}, refunds {m['refund_amount_total']:.2f} subtracted), "
        f"conversion {m['conversion_rate_pct']:.2f}%, failed payment rate {m['failed_payment_rate_pct']:.2f}%, "
        f"refund rate {m['refund_rate_pct']:.2f}%, top channel {m['top_channel']}, "
        f"and p95 latency {m['p95_latency_ms']:.2f} ms. Quality checks passed: {len(checks)}."
    )

gateway.py — plan/policy/quality boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_rows: int = 5000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "ingest_sales",
    "profile_sales",
    "transform_sales",
    "analyze_sales",
    "validate_analysis",
]


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    steps: list[dict[str, Any]] = []
    actions: list[str] = []
    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        steps.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return steps


class DataAnalysisGateway:
    def __init__(
        self,
        *,
        allowed_sources_policy: set[str],
        allowed_sources_execution: set[str],
        allowed_regions_policy: set[str],
        budget: Budget,
    ):
        self.allowed_sources_policy = set(allowed_sources_policy)
        self.allowed_sources_execution = set(allowed_sources_execution)
        self.allowed_regions_policy = {x.upper() for x in allowed_regions_policy}
        self.budget = budget

    def evaluate_ingest(self, *, source: str, region: str) -> Decision:
        if source not in self.allowed_sources_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if region.upper() not in self.allowed_regions_policy:
            return Decision(kind="deny", reason="region_denied_policy")
        if source not in self.allowed_sources_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def ensure_rows_budget(self, *, rows: list[dict[str, Any]]) -> None:
        if len(rows) > self.budget.max_rows:
            raise StopRun("dataset_too_large")

    def validate_profile(self, *, profile: dict[str, Any], max_missing_channel_pct: float) -> None:
        if not bool(profile.get("schema_ok")):
            raise StopRun("invalid_profile:schema")
        if not bool(profile.get("row_validity_ok")):
            raise StopRun("invalid_profile:row_values")

        missing_pct = float(profile.get("missing_channel_pct", 0.0))
        if missing_pct > max_missing_channel_pct:
            raise StopRun("invalid_profile:missing_channel_too_high")

    def validate_metrics(self, *, metrics: dict[str, Any]) -> None:
        sample_size = int(metrics.get("sample_size", 0))
        if sample_size <= 0:
            raise StopRun("invalid_metrics:sample_size")

        conversion = float(metrics.get("conversion_rate", 0.0))
        if not (0.0 <= conversion <= 1.0):
            raise StopRun("invalid_metrics:conversion_rate")

        failed_rate = float(metrics.get("failed_payment_rate", 0.0))
        if not (0.0 <= failed_rate <= 1.0):
            raise StopRun("invalid_metrics:failed_payment_rate")

        refund_rate = float(metrics.get("refund_rate", 0.0))
        if not (0.0 <= refund_rate <= 1.0):
            raise StopRun("invalid_metrics:refund_rate")

        gross_revenue = float(metrics.get("gross_revenue", 0.0))
        refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
        net_revenue = float(metrics.get("net_revenue", 0.0))

        if net_revenue < 0:
            raise StopRun("invalid_metrics:net_revenue")
        if gross_revenue < net_revenue:
            raise StopRun("invalid_metrics:gross_less_than_net")
        if abs((gross_revenue - refund_amount_total) - net_revenue) > 0.01:
            raise StopRun("invalid_metrics:revenue_consistency")

main.py — orchestrate analysis workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_analysis_plan
from context import build_request
from gateway import Budget, DataAnalysisGateway, StopRun, validate_plan
from tools import (
    analyze_sales_rows,
    profile_sales_rows,
    read_sales_snapshot,
    transform_sales_rows,
    validate_analysis,
)

GOAL = (
    "Prepare a validated weekly data-analysis brief for US payments with revenue, "
    "conversion, failed payment rate, and quality checks."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    source="warehouse_sales_daily",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_rows=5000,
)

DEFAULT_ALLOWED_SOURCES_POLICY = {"warehouse_sales_daily", "warehouse_refunds_daily"}
ALLOWED_SOURCES_EXECUTION = {"warehouse_sales_daily"}
DEFAULT_ALLOWED_REGIONS_POLICY = {"US", "CA"}


DEFAULT_ANALYSIS_RULES = {
    "dedupe_key": "order_id",
    "dedupe_ts_key": "event_ts",
    "dedupe_strategy": "latest_by_event_ts",
    "fill_missing_channel": "unknown",
    "normalize_status": "lower_strip",
}


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def run_data_analysis_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    artifact_ids = {
        "snapshot_id": f"snap_{run_id[:8]}",
        "profile_id": f"profile_{run_id[:8]}",
        "transform_id": f"transform_{run_id[:8]}",
        "metrics_id": f"metrics_{run_id[:8]}",
        "quality_id": f"quality_{run_id[:8]}",
    }

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_sources_raw = hints.get("allowed_sources")
    if isinstance(allowed_sources_raw, list):
        allowed_sources_policy = {
            str(item).strip()
            for item in allowed_sources_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
    if not allowed_sources_policy:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)

    allowed_regions_raw = hints.get("allowed_regions")
    if isinstance(allowed_regions_raw, list):
        allowed_regions_policy = {
            str(item).strip().upper()
            for item in allowed_regions_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
    if not allowed_regions_policy:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)

    max_rows_raw = hints.get("max_rows", DEFAULT_BUDGET.max_rows)
    max_missing_channel_pct_raw = hints.get("max_missing_channel_pct", 0.25)

    analysis_rules_raw = hints.get("analysis_rules")
    analysis_rules = dict(DEFAULT_ANALYSIS_RULES)
    if isinstance(analysis_rules_raw, dict):
        for key, value in analysis_rules_raw.items():
            if isinstance(key, str):
                analysis_rules[key] = value

    dedupe_key = str(analysis_rules.get("dedupe_key", DEFAULT_ANALYSIS_RULES["dedupe_key"])).strip() or "order_id"
    dedupe_ts_key = str(analysis_rules.get("dedupe_ts_key", DEFAULT_ANALYSIS_RULES["dedupe_ts_key"])).strip() or "event_ts"
    dedupe_strategy = str(
        analysis_rules.get("dedupe_strategy", DEFAULT_ANALYSIS_RULES["dedupe_strategy"])
    ).strip() or "latest_by_event_ts"
    fill_missing_channel = str(
        analysis_rules.get("fill_missing_channel", DEFAULT_ANALYSIS_RULES["fill_missing_channel"])
    ).strip() or "unknown"
    normalize_status = str(
        analysis_rules.get("normalize_status", DEFAULT_ANALYSIS_RULES["normalize_status"])
    ).strip() or "lower_strip"

    if dedupe_strategy != "latest_by_event_ts":
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:dedupe_strategy",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_rows = int(max_rows_raw)
    except (TypeError, ValueError):
        max_rows = DEFAULT_BUDGET.max_rows
    if max_rows <= 0:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_rows",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_missing_channel_pct = float(max_missing_channel_pct_raw)
    except (TypeError, ValueError):
        max_missing_channel_pct = 0.25
    if not (0.0 <= max_missing_channel_pct <= 1.0):
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_missing_channel_pct",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_rows=max(10, min(200000, max_rows)),
    )

    gateway = DataAnalysisGateway(
        allowed_sources_policy=allowed_sources_policy,
        allowed_sources_execution=ALLOWED_SOURCES_EXECUTION,
        allowed_regions_policy=allowed_regions_policy,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_analysis_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_analysis_plan",
                "step_ids": [item["id"] for item in steps],
            }
        )

        phase = "policy_check"
        ingest_args = steps[0]["args"]
        source = str(ingest_args.get("source", "")).strip()
        region = str(ingest_args.get("region", "")).strip().upper()
        decision = gateway.evaluate_ingest(source=source, region=region)

        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "source": source,
                "region": region,
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_sources_policy": sorted(allowed_sources_policy),
                "allowed_sources_execution": sorted(ALLOWED_SOURCES_EXECUTION),
                "elapsed_ms": elapsed_ms(),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {"kind": decision.kind, "reason": decision.reason},
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="ingest_profile")

        phase = "ingest_profile"
        req = request["request"]
        snapshot = _unwrap_tool_data(
            read_sales_snapshot(
                source=source,
                region=region,
                rows=list(req["rows"]),
            ),
            tool_name="read_sales_snapshot",
        )
        rows_raw = list(snapshot["rows"])
        gateway.ensure_rows_budget(rows=rows_raw)

        profile = _unwrap_tool_data(
            profile_sales_rows(rows=rows_raw),
            tool_name="profile_sales_rows",
        )
        gateway.validate_profile(profile=profile, max_missing_channel_pct=max_missing_channel_pct)

        trace.append(
            {
                "step": 3,
                "phase": "ingest_profile",
                "row_count_raw": profile["row_count_raw"],
                "duplicate_rows": profile["duplicate_rows"],
                "missing_channel_pct": round(float(profile["missing_channel_pct"]) * 100, 2),
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "ingest_profile",
                "source": source,
                "row_count_raw": profile["row_count_raw"],
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="transform_analyze")

        phase = "transform_analyze"
        transformed = _unwrap_tool_data(
            transform_sales_rows(
                rows=rows_raw,
                dedupe_key=dedupe_key,
                dedupe_ts_key=dedupe_ts_key,
                fill_missing_channel=fill_missing_channel,
                normalize_status=normalize_status,
            ),
            tool_name="transform_sales_rows",
        )
        rows_clean = list(transformed["rows"])

        metrics = _unwrap_tool_data(
            analyze_sales_rows(rows=rows_clean),
            tool_name="analyze_sales_rows",
        )
        gateway.validate_metrics(metrics=metrics)

        trace.append(
            {
                "step": 4,
                "phase": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "deduped_rows": transformed["deduped_rows"],
                "filled_missing_channel": transformed["filled_missing_channel"],
                "skipped_rows_missing_key": transformed.get("skipped_rows_missing_key", 0),
                "skipped_rows_invalid_ts": transformed.get("skipped_rows_invalid_ts", 0),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "applied_rules": {
                    "dedupe_strategy": dedupe_strategy,
                    "dedupe_key": dedupe_key,
                    "dedupe_ts_key": dedupe_ts_key,
                    "fill_missing_channel": fill_missing_channel,
                    "normalize_status": normalize_status,
                },
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="validate")

        phase = "validate"
        quality = _unwrap_tool_data(
            validate_analysis(
                metrics=metrics,
                profile=profile,
                max_missing_channel_pct=max_missing_channel_pct,
            ),
            tool_name="validate_analysis",
        )
        if not bool(quality.get("ok")):
            failed = quality.get("failed_checks") or []
            first = str(failed[0]) if failed else "unknown"
            return stopped(f"validation_failed:{first}", phase=phase, quality=quality)

        trace.append(
            {
                "step": 5,
                "phase": "validate",
                "passed_checks": len(quality.get("passed_checks", [])),
                "failed_checks": len(quality.get("failed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "validate_analysis",
                "passed_checks": len(quality.get("passed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "source": source,
            "metrics": {
                "sample_size": int(metrics["sample_size"]),
                "gross_revenue": round(float(metrics["gross_revenue"]), 2),
                "refund_amount_total": round(float(metrics["refund_amount_total"]), 2),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate": round(float(metrics["conversion_rate"]), 6),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate": round(float(metrics["failed_payment_rate"]), 6),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "refund_rate": round(float(metrics["refund_rate"]), 6),
                "refund_rate_pct": round(float(metrics["refund_rate"]) * 100, 2),
                "top_channel": str(metrics["top_channel"]),
                "avg_latency_ms": round(float(metrics["avg_latency_ms"]), 2),
                "p95_latency_ms": round(float(metrics["p95_latency_ms"]), 2),
                "eta_minutes": int(metrics["eta_minutes"]),
            },
            "data_quality": {
                "row_count_raw": int(profile["row_count_raw"]),
                "row_count_clean": int(transformed["row_count_clean"]),
                "deduped_rows": int(transformed["deduped_rows"]),
                "missing_channel_pct_raw": round(float(profile["missing_channel_pct"]) * 100, 2),
                "row_validity_ok": bool(profile["row_validity_ok"]),
                "invalid_order_id_count": int(profile["invalid_order_id_count"]),
                "invalid_event_ts_count": int(profile["invalid_event_ts_count"]),
                "invalid_status_count": int(profile["invalid_status_count"]),
                "invalid_amount_count": int(profile["invalid_amount_count"]),
                "invalid_latency_count": int(profile["invalid_latency_count"]),
                "skipped_rows_missing_key": int(transformed.get("skipped_rows_missing_key", 0)),
                "skipped_rows_invalid_ts": int(transformed.get("skipped_rows_invalid_ts", 0)),
            },
        }

        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            quality=quality,
        )

        trace.append(
            {
                "step": 6,
                "phase": "finalize",
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "validated_analysis",
            "answer": answer,
            "aggregate": aggregate,
            "quality": quality,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_data_analysis_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Ce qui compte le plus ici (en mots simples)

  • le workflow est force et fixe, donc l agent ne peut pas "sauter" profile/validate
  • policy boundary decide separement si la source peut etre utilisee dans ce run
  • profiling + validation donnent un quality-gate technique, pas une "foi dans le chiffre" ; avec invalid_profile:*, le run s arrete avant transform
  • le dedup est deterministe (latest_by_event_ts) pour un event_ts UTC strict (YYYY-MM-DDTHH:MM:SSZ), et les cleaning rules viennent explicitement de policy_hints
  • deduped_rows dans cette demo montre le total des lignes supprimees apres transform (dedup + possible exclusion de lignes invalides) ; dans le sample actuel il n y a pas d exclusions
  • filled_missing_channel est compte apres dedup : si un doublon est remplace par une ligne plus recente avec channel valide, aucun remplissage n est necessaire (d ou 1 ici et non 2)
  • finalize n est pas ici une etape tool, mais une etape d orchestration dans main.py
  • le brief final est forme uniquement a partir de metriques validees
  • trace/history avec elapsed_ms et artifact_refs rendent le pipeline auditable

Exemple de Sortie

JSON
{
  "run_id": "3045a7d0-1431-455b-9d26-16ae9fbc653a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "validated_analysis",
  "answer": "Data analysis brief (US, 2026-03-07): source warehouse_sales_daily shows net revenue 600.00 (gross 670.00, refunds 70.00 subtracted), conversion 63.64%, failed payment rate 18.18%, refund rate 18.18%, top channel paid_search, and p95 latency 240.00 ms. Quality checks passed: 12.",
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "source": "warehouse_sales_daily",
    "metrics": {
      "sample_size": 11,
      "gross_revenue": 670.0,
      "refund_amount_total": 70.0,
      "net_revenue": 600.0,
      "conversion_rate": 0.636364,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate": 0.181818,
      "failed_payment_rate_pct": 18.18,
      "refund_rate": 0.181818,
      "refund_rate_pct": 18.18,
      "top_channel": "paid_search",
      "avg_latency_ms": 200.73,
      "p95_latency_ms": 240.0,
      "eta_minutes": 45
    },
    "data_quality": {
      "row_count_raw": 12,
      "row_count_clean": 11,
      "deduped_rows": 1,
      "missing_channel_pct_raw": 16.67,
      "row_validity_ok": true,
      "invalid_order_id_count": 0,
      "invalid_event_ts_count": 0,
      "invalid_status_count": 0,
      "invalid_amount_count": 0,
      "invalid_latency_count": 0,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0
    }
  },
  "quality": {
    "ok": true,
    "passed_checks": [
      "conversion_between_0_1",
      "refund_rate_between_0_1",
      "gross_revenue_non_negative",
      "refund_amount_non_negative",
      "net_revenue_non_negative",
      "failed_rate_reasonable",
      "sample_size_min_10",
      "p95_latency_under_400",
      "missing_channel_within_policy",
      "gross_ge_net",
      "gross_minus_refunds_equals_net",
      "profile_row_values_valid"
    ],
    "failed_checks": []
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "source": "warehouse_sales_daily",
      "region": "US",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_sources_policy": [
        "warehouse_refunds_daily",
        "warehouse_sales_daily"
      ],
      "allowed_sources_execution": [
        "warehouse_sales_daily"
      ],
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "ingest_profile",
      "row_count_raw": 12,
      "duplicate_rows": 1,
      "missing_channel_pct": 16.67,
      "artifact_refs": {
        "snapshot_id": "snap_3045a7d0",
        "profile_id": "profile_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 4,
      "phase": "transform_analyze",
      "row_count_clean": 11,
      "deduped_rows": 1,
      "filled_missing_channel": 1,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0,
      "net_revenue": 600.0,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate_pct": 18.18,
      "artifact_refs": {
        "transform_id": "transform_3045a7d0",
        "metrics_id": "metrics_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 5,
      "phase": "validate",
      "passed_checks": 12,
      "failed_checks": 0,
      "artifact_refs": {
        "quality_id": "quality_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 6,
      "phase": "finalize",
      "elapsed_ms": 2,
      "ok": true
    }
  ],
  "history": [{...}]
}

stop_reason Typiques

  • success - run termine correctement
  • max_seconds - budget total de temps epuise
  • invalid_plan:* - plan d analyse invalide
  • invalid_plan:step_sequence - ordre obligatoire du workflow viole (avec details expected/received)
  • invalid_step:* - contrat de step invalide
  • policy_block:source_denied_policy - source refusee par policy allowlist
  • policy_block:source_denied_execution - source refusee par execution allowlist
  • policy_block:region_denied_policy - region refusee par policy
  • dataset_too_large - limite de lignes depassee
  • invalid_profile:schema - profile a detecte un schema mismatch
  • invalid_profile:row_values - profile a detecte des valeurs de lignes invalides
  • invalid_profile:missing_channel_too_high - part de channel manquant au-dessus du threshold policy
  • invalid_metrics:* - metrics n ont pas passe les invariants de base
  • validation_failed:* - quality validation a echoue
  • tool_invalid_output:* - etape a renvoye un contrat invalide
  • invalid_request:dedupe_strategy - dedupe strategy non supportee dans analysis_rules
  • invalid_request:max_rows - valeur max_rows invalide
  • invalid_request:max_missing_channel_pct - threshold max_missing_channel_pct invalide

Ce Qui N est PAS Montre Ici

  • connexion a un vrai DWH/BI au lieu d un snapshot in-memory
  • versionnement des datasets (snapshot lineage)
  • SQL pushdown et cost-aware query planning
  • multi-source joins avec conflits de schema
  • anomaly detection au-dela des invariants simples
  • persisted artifacts (tables/graphiques) vers stockage

Ce Qu il Faut Essayer Ensuite

  1. Mettre source="warehouse_refunds_daily" et voir policy_block:source_denied_execution.
  2. Mettre max_missing_channel_pct=0.1 et voir invalid_profile:missing_channel_too_high.
  3. Ajouter au plan un step hors EXPECTED_ACTION_SEQUENCE et voir invalid_plan:step_sequence.
  4. Augmenter rows au-dela de max_rows et voir dataset_too_large.
  5. Retourner dans analyze conversion_rate=1.2 ou gross < net et voir invalid_metrics:*.
⏱️ 20 min de lectureMis à jour 6 mars 2026Difficulté: ★★☆
Intégré : contrôle en productionOnceOnly
Ajoutez des garde-fous aux agents tool-calling
Livrez ce pattern avec de la gouvernance :
  • Budgets (steps / plafonds de coût)
  • Permissions outils (allowlist / blocklist)
  • Kill switch & arrêt incident
  • Idempotence & déduplication
  • Audit logs & traçabilité
Mention intégrée : OnceOnly est une couche de contrôle pour des systèmes d’agents en prod.

Auteur

Nick — ingénieur qui construit une infrastructure pour des agents IA en production.

Focus : patterns d’agents, modes de défaillance, contrôle du runtime et fiabilité des systèmes.

🔗 GitHub: https://github.com/mykolademyanov


Note éditoriale

Cette documentation est assistée par l’IA, avec une responsabilité éditoriale humaine pour l’exactitude, la clarté et la pertinence en production.

Le contenu s’appuie sur des défaillances réelles, des post-mortems et des incidents opérationnels dans des systèmes d’agents IA déployés.