Data-Analysis Agent en Python: Ejemplo completo

Ejemplo runnable estilo production de un agente Data-Analysis en Python con workflow fijo, policy boundary, validacion de metricas y trace/history transparentes.
En esta página
  1. Esencia del Patron (breve)
  2. Que Demuestra Este Ejemplo
  3. Arquitectura
  4. Estructura del Proyecto
  5. Como Ejecutar
  6. Tarea
  7. Codigo
  8. context.py — request envelope + sample dataset
  9. tools.py — deterministic analysis steps
  10. agent.py — plan proposal + final answer
  11. gateway.py — plan/policy/quality boundaries
  12. main.py — orchestrate analysis workflow
  13. Ejemplo de Salida
  14. stop_reason Tipicos
  15. Lo Que NO Se Muestra Aqui
  16. Que Probar Despues

Esencia del Patron (breve)

Data-Analysis Agent significa: el agente no "inventa" metricas, sino que sigue un workflow fijo:

  • ingest
  • profile
  • transform
  • analyze
  • validate
  • finalize (paso host de orquestacion)

Es decir, el resultado depende de un pipeline controlado, no de razonamiento libre en texto.


Que Demuestra Este Ejemplo

  • El agente propone un plan de analisis, pero runtime valida el contrato del plan y el orden de pasos
  • policy allowlist y execution allowlist para la fuente de datos se manejan por separado
  • el workflow esta fijado estrictamente: no se puede saltar profile/validate
  • profile-check detecta duplicados, faltantes y valores de fila invalidos; si el quality-gate falla, el pipeline se detiene antes de transform
  • el paso transform aplica dedup determinista (latest_by_event_ts), normalizacion de campos y cleaning rules desde policy_hints
  • cleaning rules y dedupe strategy vienen de policy_hints (analisis configurable por policy)
  • el paso analyze calcula metricas (gross/net revenue, conversion, failure rate, refund rate, latency)
  • el paso validate comprueba invariantes de calidad y consistencia de agregados
  • trace/history dan auditabilidad desde el plan hasta la finalizacion

Arquitectura

  1. agent.py arma el plan de pasos de analisis.
  2. gateway.py valida el plan y la decision de policy para ingest.
  3. tools.py ejecuta pasos deterministas: ingest/profile/transform/analyze/validate.
  4. main.py orquesta el workflow, ejecuta finalize como paso host, junta aggregate, trace/history y devuelve el brief final.

Estructura del Proyecto

TEXT
agent-patterns/
└── data-analysis-agent/
    └── python/
        ├── main.py
        ├── gateway.py
        ├── tools.py
        ├── agent.py
        ├── context.py
        ├── README.md
        └── requirements.txt

Como Ejecutar

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd agent-patterns/data-analysis-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Se requiere Python 3.11+.

Variante con export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Variante con .env (opcional)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Esta es una variante de shell (macOS/Linux). En Windows es mas simple usar variables con set o, si prefieres, python-dotenv.


Tarea

Caso de produccion:

"Prepara un brief analitico customer-safe de los ultimos 7 dias para US: revenue, conversion, failure rate, data-quality checks."


Codigo

context.py — request envelope + sample dataset

PYTHON
from __future__ import annotations

from typing import Any


def build_request(*, report_date: str, region: str, source: str) -> dict[str, Any]:
    rows = [
        {
            "order_id": "o-1001",
            "event_ts": "2026-03-01T10:00:00Z",
            "status": "paid",
            "amount": 120.0,
            "channel": "paid_search",
            "latency_ms": 180,
        },
        {
            "order_id": "o-1002",
            "event_ts": "2026-03-01T10:05:00Z",
            "status": "paid",
            "amount": 90.0,
            "channel": "email",
            "latency_ms": 160,
        },
        {
            "order_id": "o-1003",
            "event_ts": "2026-03-01T10:10:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "paid_search",
            "latency_ms": 240,
        },
        {
            "order_id": "o-1004",
            "event_ts": "2026-03-01T10:15:00Z",
            "status": "refunded",
            "amount": 40.0,
            "channel": "email",
            "latency_ms": 210,
        },
        {
            "order_id": "o-1005",
            "event_ts": "2026-03-01T10:20:00Z",
            "status": "paid",
            "amount": 140.0,
            "channel": "paid_search",
            "latency_ms": 170,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:25:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": None,
            "latency_ms": 190,
        },
        {
            "order_id": "o-1007",
            "event_ts": "2026-03-01T10:30:00Z",
            "status": "failed",
            "amount": 0.0,
            "channel": "organic",
            "latency_ms": 260,
        },
        {
            "order_id": "o-1008",
            "event_ts": "2026-03-01T10:35:00Z",
            "status": "paid",
            "amount": 80.0,
            "channel": "paid_social",
            "latency_ms": 200,
        },
        {
            "order_id": "o-1009",
            "event_ts": "2026-03-01T10:40:00Z",
            "status": "refunded",
            "amount": 30.0,
            "channel": "paid_social",
            "latency_ms": 230,
        },
        {
            "order_id": "o-1010",
            "event_ts": "2026-03-01T10:45:00Z",
            "status": "paid",
            "amount": 70.0,
            "channel": "paid_search",
            "latency_ms": 175,
        },
        {
            "order_id": "o-1006",
            "event_ts": "2026-03-01T10:50:00Z",
            "status": "paid",
            "amount": 110.0,
            "channel": "email",
            "latency_ms": 188,
        },
        {
            "order_id": "o-1011",
            "event_ts": "2026-03-01T10:55:00Z",
            "status": "paid",
            "amount": 60.0,
            "channel": None,
            "latency_ms": 195,
        },
    ]

    return {
        "request": {
            "report_date": report_date,
            "region": region.upper(),
            "source": source,
            "rows": rows,
        },
        "policy_hints": {
            "allowed_sources": ["warehouse_sales_daily", "warehouse_refunds_daily"],
            "allowed_regions": ["US", "CA"],
            "max_rows": 5000,
            "max_missing_channel_pct": 0.25,
            "analysis_rules": {
                "dedupe_key": "order_id",
                "dedupe_ts_key": "event_ts",
                "dedupe_strategy": "latest_by_event_ts",
                "fill_missing_channel": "unknown",
                "normalize_status": "lower_strip",
            },
        },
    }

tools.py — deterministic analysis steps

PYTHON
from __future__ import annotations

import math
from datetime import datetime
from collections import defaultdict
from typing import Any

ALLOWED_STATUSES = {"paid", "failed", "refunded"}


def _safe_float(value: Any) -> tuple[bool, float]:
    try:
        v = float(value)
    except (TypeError, ValueError):
        return False, 0.0
    if not math.isfinite(v):
        return False, 0.0
    return True, v


def _is_valid_event_ts(value: Any) -> bool:
    if not isinstance(value, str):
        return False
    ts = value.strip()
    if len(ts) != 20 or not ts.endswith("Z"):
        return False
    try:
        datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
    except ValueError:
        return False
    return True


def read_sales_snapshot(*, source: str, region: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
    return {
        "status": "ok",
        "data": {
            "source": source,
            "region": region.upper(),
            "rows": rows,
            "snapshot_at": "2026-03-07T10:00:00Z",
        },
    }


def profile_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    required_fields = {"order_id", "event_ts", "status", "amount", "channel", "latency_ms"}
    schema_ok = True
    missing_fields_count = 0

    missing_channel_count = 0
    duplicate_rows = 0
    by_order_id: dict[str, int] = {}

    invalid_order_id_count = 0
    invalid_event_ts_count = 0
    invalid_status_count = 0
    invalid_amount_count = 0
    invalid_latency_count = 0

    for row in rows:
        keys = set(row.keys())
        if not required_fields.issubset(keys):
            schema_ok = False
            missing_fields_count += 1

        order_id = row.get("order_id")
        order_id_norm = str(order_id).strip() if isinstance(order_id, str) else ""
        if not order_id_norm:
            invalid_order_id_count += 1
        else:
            by_order_id[order_id_norm] = by_order_id.get(order_id_norm, 0) + 1
            if by_order_id[order_id_norm] > 1:
                duplicate_rows += 1

        event_ts = row.get("event_ts")
        if not _is_valid_event_ts(event_ts):
            invalid_event_ts_count += 1

        status = str(row.get("status", "")).strip().lower()
        if status not in ALLOWED_STATUSES:
            invalid_status_count += 1

        ok_amount, amount = _safe_float(row.get("amount"))
        if not ok_amount or amount < 0.0:
            invalid_amount_count += 1

        ok_latency, latency = _safe_float(row.get("latency_ms"))
        if not ok_latency or latency < 0.0:
            invalid_latency_count += 1

        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            missing_channel_count += 1

    total = len(rows)
    missing_channel_pct = (missing_channel_count / total) if total else 0.0

    row_validity_ok = (
        invalid_order_id_count == 0
        and invalid_event_ts_count == 0
        and invalid_status_count == 0
        and invalid_amount_count == 0
        and invalid_latency_count == 0
    )

    return {
        "status": "ok",
        "data": {
            "row_count_raw": total,
            "duplicate_rows": duplicate_rows,
            "missing_channel_count": missing_channel_count,
            "missing_channel_pct": missing_channel_pct,
            "schema_ok": schema_ok,
            "missing_fields_count": missing_fields_count,
            "row_validity_ok": row_validity_ok,
            "invalid_order_id_count": invalid_order_id_count,
            "invalid_event_ts_count": invalid_event_ts_count,
            "invalid_status_count": invalid_status_count,
            "invalid_amount_count": invalid_amount_count,
            "invalid_latency_count": invalid_latency_count,
        },
    }


def transform_sales_rows(
    *,
    rows: list[dict[str, Any]],
    dedupe_key: str,
    dedupe_ts_key: str,
    fill_missing_channel: str,
    normalize_status: str,
) -> dict[str, Any]:
    latest_by_key: dict[str, dict[str, Any]] = {}
    ts_by_key: dict[str, str] = {}
    skipped_rows_missing_key = 0
    skipped_rows_invalid_ts = 0

    for row in rows:
        key = str(row.get(dedupe_key, "")).strip()
        if not key:
            skipped_rows_missing_key += 1
            continue

        event_ts_raw = row.get(dedupe_ts_key)
        if not _is_valid_event_ts(event_ts_raw):
            skipped_rows_invalid_ts += 1
            continue
        event_ts = str(event_ts_raw).strip()
        previous_ts = ts_by_key.get(key, "")
        if key not in latest_by_key or event_ts >= previous_ts:
            latest_by_key[key] = dict(row)
            ts_by_key[key] = event_ts

    cleaned_rows = [latest_by_key[k] for k in sorted(latest_by_key.keys())]

    filled_missing_channel = 0
    for row in cleaned_rows:
        channel = row.get("channel")
        if not isinstance(channel, str) or not channel.strip():
            row["channel"] = fill_missing_channel
            filled_missing_channel += 1

        if normalize_status == "lower_strip":
            row["status"] = str(row.get("status", "")).strip().lower()

        row[dedupe_key] = str(row.get(dedupe_key, "")).strip()
        row[dedupe_ts_key] = str(row.get(dedupe_ts_key, "")).strip()
        ok_amount, amount = _safe_float(row.get("amount"))
        ok_latency, latency = _safe_float(row.get("latency_ms"))
        row["amount"] = amount if ok_amount else 0.0
        row["latency_ms"] = latency if ok_latency else 0.0

    return {
        "status": "ok",
        "data": {
            "rows": cleaned_rows,
            "row_count_clean": len(cleaned_rows),
            "deduped_rows": max(0, len(rows) - len(cleaned_rows)),
            "filled_missing_channel": filled_missing_channel,
            "skipped_rows_missing_key": skipped_rows_missing_key,
            "skipped_rows_invalid_ts": skipped_rows_invalid_ts,
            "dedupe_strategy": "latest_by_event_ts",
            "dedupe_key": dedupe_key,
            "dedupe_ts_key": dedupe_ts_key,
        },
    }


def analyze_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
    total = len(rows)
    paid_count = 0
    failed_count = 0
    refunded_count = 0

    gross_revenue = 0.0
    refund_amount_total = 0.0
    latency_values: list[float] = []
    channel_net_revenue: dict[str, float] = defaultdict(float)

    for row in rows:
        status = str(row.get("status", "")).lower()
        amount = float(row.get("amount", 0.0))
        channel = str(row.get("channel", "unknown")).strip().lower() or "unknown"
        latency = float(row.get("latency_ms", 0.0))
        latency_values.append(latency)

        if status == "paid":
            paid_count += 1
            gross_revenue += amount
            channel_net_revenue[channel] += amount
        elif status == "failed":
            failed_count += 1
        elif status == "refunded":
            refunded_count += 1
            refund_amount_total += amount
            channel_net_revenue[channel] -= amount

    net_revenue = gross_revenue - refund_amount_total
    conversion_rate = (paid_count / total) if total else 0.0
    failed_payment_rate = (failed_count / total) if total else 0.0
    refund_rate = (refunded_count / total) if total else 0.0

    avg_latency_ms = (sum(latency_values) / len(latency_values)) if latency_values else 0.0
    sorted_latency = sorted(latency_values)
    if sorted_latency:
        p95_idx = int((len(sorted_latency) - 1) * 0.95)
        p95_latency_ms = sorted_latency[p95_idx]
    else:
        p95_latency_ms = 0.0

    top_channel = "unknown"
    top_channel_value = float("-inf")
    for channel, value in channel_net_revenue.items():
        if value > top_channel_value:
            top_channel = channel
            top_channel_value = value

    eta_minutes = 45 if failed_payment_rate >= 0.15 else 20

    return {
        "status": "ok",
        "data": {
            "sample_size": total,
            "paid_count": paid_count,
            "failed_count": failed_count,
            "refunded_count": refunded_count,
            "gross_revenue": gross_revenue,
            "refund_amount_total": refund_amount_total,
            "net_revenue": net_revenue,
            "conversion_rate": conversion_rate,
            "failed_payment_rate": failed_payment_rate,
            "refund_rate": refund_rate,
            "avg_latency_ms": avg_latency_ms,
            "p95_latency_ms": p95_latency_ms,
            "top_channel": top_channel,
            "eta_minutes": eta_minutes,
        },
    }


def validate_analysis(
    *,
    metrics: dict[str, Any],
    profile: dict[str, Any],
    max_missing_channel_pct: float,
) -> dict[str, Any]:
    passed_checks: list[str] = []
    failed_checks: list[str] = []

    conversion_rate = float(metrics.get("conversion_rate", 0.0))
    failed_payment_rate = float(metrics.get("failed_payment_rate", 0.0))
    refund_rate = float(metrics.get("refund_rate", 0.0))

    gross_revenue = float(metrics.get("gross_revenue", 0.0))
    refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
    net_revenue = float(metrics.get("net_revenue", 0.0))

    sample_size = int(metrics.get("sample_size", 0))
    p95_latency_ms = float(metrics.get("p95_latency_ms", 0.0))
    missing_channel_pct = float(profile.get("missing_channel_pct", 0.0))
    row_validity_ok = bool(profile.get("row_validity_ok"))

    if 0.0 <= conversion_rate <= 1.0:
        passed_checks.append("conversion_between_0_1")
    else:
        failed_checks.append("conversion_out_of_range")

    if 0.0 <= refund_rate <= 1.0:
        passed_checks.append("refund_rate_between_0_1")
    else:
        failed_checks.append("refund_rate_out_of_range")

    if gross_revenue >= 0.0:
        passed_checks.append("gross_revenue_non_negative")
    else:
        failed_checks.append("gross_revenue_negative")

    if refund_amount_total >= 0.0:
        passed_checks.append("refund_amount_non_negative")
    else:
        failed_checks.append("refund_amount_negative")

    if net_revenue >= 0.0:
        passed_checks.append("net_revenue_non_negative")
    else:
        failed_checks.append("net_revenue_negative")

    if failed_payment_rate <= 0.35:
        passed_checks.append("failed_rate_reasonable")
    else:
        failed_checks.append("failed_rate_too_high")

    if sample_size >= 10:
        passed_checks.append("sample_size_min_10")
    else:
        failed_checks.append("sample_size_too_low")

    if p95_latency_ms <= 400.0:
        passed_checks.append("p95_latency_under_400")
    else:
        failed_checks.append("p95_latency_too_high")

    if missing_channel_pct <= max_missing_channel_pct:
        passed_checks.append("missing_channel_within_policy")
    else:
        failed_checks.append("missing_channel_policy_violation")

    if gross_revenue >= net_revenue:
        passed_checks.append("gross_ge_net")
    else:
        failed_checks.append("gross_less_than_net")

    if abs((gross_revenue - refund_amount_total) - net_revenue) <= 0.01:
        passed_checks.append("gross_minus_refunds_equals_net")
    else:
        failed_checks.append("gross_refund_net_mismatch")

    if row_validity_ok:
        passed_checks.append("profile_row_values_valid")
    else:
        failed_checks.append("profile_row_values_invalid")

    return {
        "status": "ok",
        "data": {
            "ok": len(failed_checks) == 0,
            "passed_checks": passed_checks,
            "failed_checks": failed_checks,
        },
    }

agent.py — plan proposal + final answer

PYTHON
from __future__ import annotations

from typing import Any


def propose_analysis_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    del goal
    req = request["request"]
    return {
        "steps": [
            {
                "id": "s1",
                "action": "ingest_sales",
                "args": {
                    "source": req["source"],
                    "region": req["region"],
                },
            },
            {"id": "s2", "action": "profile_sales", "args": {}},
            {"id": "s3", "action": "transform_sales", "args": {}},
            {"id": "s4", "action": "analyze_sales", "args": {}},
            {"id": "s5", "action": "validate_analysis", "args": {}},
        ]
    }


def compose_final_answer(
    *,
    request: dict[str, Any],
    aggregate: dict[str, Any],
    quality: dict[str, Any],
) -> str:
    req = request["request"]
    m = aggregate["metrics"]
    checks = quality.get("passed_checks", [])

    return (
        f"Data analysis brief ({req['region']}, {req['report_date']}): source {req['source']} shows net revenue "
        f"{m['net_revenue']:.2f} (gross {m['gross_revenue']:.2f}, refunds {m['refund_amount_total']:.2f} subtracted), "
        f"conversion {m['conversion_rate_pct']:.2f}%, failed payment rate {m['failed_payment_rate_pct']:.2f}%, "
        f"refund rate {m['refund_rate_pct']:.2f}%, top channel {m['top_channel']}, "
        f"and p95 latency {m['p95_latency_ms']:.2f} ms. Quality checks passed: {len(checks)}."
    )

gateway.py — plan/policy/quality boundaries

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
        super().__init__(reason)
        self.reason = reason
        self.details = details or {}


@dataclass(frozen=True)
class Budget:
    max_seconds: int = 25
    max_steps: int = 8
    max_rows: int = 5000


@dataclass(frozen=True)
class Decision:
    kind: str
    reason: str


EXPECTED_ACTION_SEQUENCE = [
    "ingest_sales",
    "profile_sales",
    "transform_sales",
    "analyze_sales",
    "validate_analysis",
]


def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
    if not isinstance(raw_steps, list) or not raw_steps:
        raise StopRun("invalid_plan:steps")
    if len(raw_steps) > max_steps:
        raise StopRun("invalid_plan:too_many_steps")

    steps: list[dict[str, Any]] = []
    actions: list[str] = []
    for raw in raw_steps:
        if not isinstance(raw, dict):
            raise StopRun("invalid_step:not_object")
        step_id = raw.get("id")
        action = raw.get("action")
        args = raw.get("args")
        if not isinstance(step_id, str) or not step_id.strip():
            raise StopRun("invalid_step:id")
        if not isinstance(action, str) or not action.strip():
            raise StopRun("invalid_step:action")
        if not isinstance(args, dict):
            raise StopRun("invalid_step:args")

        normalized = {
            "id": step_id.strip(),
            "action": action.strip(),
            "args": dict(args),
        }
        steps.append(normalized)
        actions.append(normalized["action"])

    if actions != EXPECTED_ACTION_SEQUENCE:
        raise StopRun(
            "invalid_plan:step_sequence",
            details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
        )

    return steps


class DataAnalysisGateway:
    def __init__(
        self,
        *,
        allowed_sources_policy: set[str],
        allowed_sources_execution: set[str],
        allowed_regions_policy: set[str],
        budget: Budget,
    ):
        self.allowed_sources_policy = set(allowed_sources_policy)
        self.allowed_sources_execution = set(allowed_sources_execution)
        self.allowed_regions_policy = {x.upper() for x in allowed_regions_policy}
        self.budget = budget

    def evaluate_ingest(self, *, source: str, region: str) -> Decision:
        if source not in self.allowed_sources_policy:
            return Decision(kind="deny", reason="source_denied_policy")
        if region.upper() not in self.allowed_regions_policy:
            return Decision(kind="deny", reason="region_denied_policy")
        if source not in self.allowed_sources_execution:
            return Decision(kind="deny", reason="source_denied_execution")
        return Decision(kind="allow", reason="policy_pass")

    def ensure_rows_budget(self, *, rows: list[dict[str, Any]]) -> None:
        if len(rows) > self.budget.max_rows:
            raise StopRun("dataset_too_large")

    def validate_profile(self, *, profile: dict[str, Any], max_missing_channel_pct: float) -> None:
        if not bool(profile.get("schema_ok")):
            raise StopRun("invalid_profile:schema")
        if not bool(profile.get("row_validity_ok")):
            raise StopRun("invalid_profile:row_values")

        missing_pct = float(profile.get("missing_channel_pct", 0.0))
        if missing_pct > max_missing_channel_pct:
            raise StopRun("invalid_profile:missing_channel_too_high")

    def validate_metrics(self, *, metrics: dict[str, Any]) -> None:
        sample_size = int(metrics.get("sample_size", 0))
        if sample_size <= 0:
            raise StopRun("invalid_metrics:sample_size")

        conversion = float(metrics.get("conversion_rate", 0.0))
        if not (0.0 <= conversion <= 1.0):
            raise StopRun("invalid_metrics:conversion_rate")

        failed_rate = float(metrics.get("failed_payment_rate", 0.0))
        if not (0.0 <= failed_rate <= 1.0):
            raise StopRun("invalid_metrics:failed_payment_rate")

        refund_rate = float(metrics.get("refund_rate", 0.0))
        if not (0.0 <= refund_rate <= 1.0):
            raise StopRun("invalid_metrics:refund_rate")

        gross_revenue = float(metrics.get("gross_revenue", 0.0))
        refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
        net_revenue = float(metrics.get("net_revenue", 0.0))

        if net_revenue < 0:
            raise StopRun("invalid_metrics:net_revenue")
        if gross_revenue < net_revenue:
            raise StopRun("invalid_metrics:gross_less_than_net")
        if abs((gross_revenue - refund_amount_total) - net_revenue) > 0.01:
            raise StopRun("invalid_metrics:revenue_consistency")

main.py — orchestrate analysis workflow

PYTHON
from __future__ import annotations

import json
import time
import uuid
from typing import Any

from agent import compose_final_answer, propose_analysis_plan
from context import build_request
from gateway import Budget, DataAnalysisGateway, StopRun, validate_plan
from tools import (
    analyze_sales_rows,
    profile_sales_rows,
    read_sales_snapshot,
    transform_sales_rows,
    validate_analysis,
)

GOAL = (
    "Prepare a validated weekly data-analysis brief for US payments with revenue, "
    "conversion, failed payment rate, and quality checks."
)
REQUEST = build_request(
    report_date="2026-03-07",
    region="US",
    source="warehouse_sales_daily",
)

DEFAULT_BUDGET = Budget(
    max_seconds=25,
    max_steps=8,
    max_rows=5000,
)

DEFAULT_ALLOWED_SOURCES_POLICY = {"warehouse_sales_daily", "warehouse_refunds_daily"}
ALLOWED_SOURCES_EXECUTION = {"warehouse_sales_daily"}
DEFAULT_ALLOWED_REGIONS_POLICY = {"US", "CA"}


DEFAULT_ANALYSIS_RULES = {
    "dedupe_key": "order_id",
    "dedupe_ts_key": "event_ts",
    "dedupe_strategy": "latest_by_event_ts",
    "fill_missing_channel": "unknown",
    "normalize_status": "lower_strip",
}


def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
    if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
        raise StopRun(f"tool_invalid_output:{tool_name}")
    return dict(raw["data"])


def run_data_analysis_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
    run_id = str(uuid.uuid4())
    started = time.monotonic()
    trace: list[dict[str, Any]] = []
    history: list[dict[str, Any]] = []

    artifact_ids = {
        "snapshot_id": f"snap_{run_id[:8]}",
        "profile_id": f"profile_{run_id[:8]}",
        "transform_id": f"transform_{run_id[:8]}",
        "metrics_id": f"metrics_{run_id[:8]}",
        "quality_id": f"quality_{run_id[:8]}",
    }

    def elapsed_ms() -> int:
        return max(1, int((time.monotonic() - started) * 1000))

    hints_raw = request.get("policy_hints")
    hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}

    allowed_sources_raw = hints.get("allowed_sources")
    if isinstance(allowed_sources_raw, list):
        allowed_sources_policy = {
            str(item).strip()
            for item in allowed_sources_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
    if not allowed_sources_policy:
        allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)

    allowed_regions_raw = hints.get("allowed_regions")
    if isinstance(allowed_regions_raw, list):
        allowed_regions_policy = {
            str(item).strip().upper()
            for item in allowed_regions_raw
            if isinstance(item, str) and item.strip()
        }
    else:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
    if not allowed_regions_policy:
        allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)

    max_rows_raw = hints.get("max_rows", DEFAULT_BUDGET.max_rows)
    max_missing_channel_pct_raw = hints.get("max_missing_channel_pct", 0.25)

    analysis_rules_raw = hints.get("analysis_rules")
    analysis_rules = dict(DEFAULT_ANALYSIS_RULES)
    if isinstance(analysis_rules_raw, dict):
        for key, value in analysis_rules_raw.items():
            if isinstance(key, str):
                analysis_rules[key] = value

    dedupe_key = str(analysis_rules.get("dedupe_key", DEFAULT_ANALYSIS_RULES["dedupe_key"])).strip() or "order_id"
    dedupe_ts_key = str(analysis_rules.get("dedupe_ts_key", DEFAULT_ANALYSIS_RULES["dedupe_ts_key"])).strip() or "event_ts"
    dedupe_strategy = str(
        analysis_rules.get("dedupe_strategy", DEFAULT_ANALYSIS_RULES["dedupe_strategy"])
    ).strip() or "latest_by_event_ts"
    fill_missing_channel = str(
        analysis_rules.get("fill_missing_channel", DEFAULT_ANALYSIS_RULES["fill_missing_channel"])
    ).strip() or "unknown"
    normalize_status = str(
        analysis_rules.get("normalize_status", DEFAULT_ANALYSIS_RULES["normalize_status"])
    ).strip() or "lower_strip"

    if dedupe_strategy != "latest_by_event_ts":
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:dedupe_strategy",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_rows = int(max_rows_raw)
    except (TypeError, ValueError):
        max_rows = DEFAULT_BUDGET.max_rows
    if max_rows <= 0:
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_rows",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    try:
        max_missing_channel_pct = float(max_missing_channel_pct_raw)
    except (TypeError, ValueError):
        max_missing_channel_pct = 0.25
    if not (0.0 <= max_missing_channel_pct <= 1.0):
        return {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": "invalid_request:max_missing_channel_pct",
            "phase": "plan",
            "trace": trace,
            "history": history,
        }

    budget = Budget(
        max_seconds=DEFAULT_BUDGET.max_seconds,
        max_steps=DEFAULT_BUDGET.max_steps,
        max_rows=max(10, min(200000, max_rows)),
    )

    gateway = DataAnalysisGateway(
        allowed_sources_policy=allowed_sources_policy,
        allowed_sources_execution=ALLOWED_SOURCES_EXECUTION,
        allowed_regions_policy=allowed_regions_policy,
        budget=budget,
    )

    def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
        payload = {
            "run_id": run_id,
            "status": "stopped",
            "stop_reason": stop_reason,
            "phase": phase,
            "trace": trace,
            "history": history,
        }
        payload.update(extra)
        return payload

    phase = "plan"
    try:
        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase=phase)

        raw_plan = propose_analysis_plan(goal=goal, request=request)
        steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)

        trace.append(
            {
                "step": 1,
                "phase": "plan",
                "steps": len(steps),
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 1,
                "action": "propose_analysis_plan",
                "step_ids": [item["id"] for item in steps],
            }
        )

        phase = "policy_check"
        ingest_args = steps[0]["args"]
        source = str(ingest_args.get("source", "")).strip()
        region = str(ingest_args.get("region", "")).strip().upper()
        decision = gateway.evaluate_ingest(source=source, region=region)

        trace.append(
            {
                "step": 2,
                "phase": "policy_check",
                "source": source,
                "region": region,
                "decision": decision.kind,
                "reason": decision.reason,
                "allowed_sources_policy": sorted(allowed_sources_policy),
                "allowed_sources_execution": sorted(ALLOWED_SOURCES_EXECUTION),
                "elapsed_ms": elapsed_ms(),
                "ok": decision.kind == "allow",
            }
        )
        history.append(
            {
                "step": 2,
                "action": "policy_check",
                "decision": {"kind": decision.kind, "reason": decision.reason},
            }
        )
        if decision.kind != "allow":
            return stopped(f"policy_block:{decision.reason}", phase=phase)

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="ingest_profile")

        phase = "ingest_profile"
        req = request["request"]
        snapshot = _unwrap_tool_data(
            read_sales_snapshot(
                source=source,
                region=region,
                rows=list(req["rows"]),
            ),
            tool_name="read_sales_snapshot",
        )
        rows_raw = list(snapshot["rows"])
        gateway.ensure_rows_budget(rows=rows_raw)

        profile = _unwrap_tool_data(
            profile_sales_rows(rows=rows_raw),
            tool_name="profile_sales_rows",
        )
        gateway.validate_profile(profile=profile, max_missing_channel_pct=max_missing_channel_pct)

        trace.append(
            {
                "step": 3,
                "phase": "ingest_profile",
                "row_count_raw": profile["row_count_raw"],
                "duplicate_rows": profile["duplicate_rows"],
                "missing_channel_pct": round(float(profile["missing_channel_pct"]) * 100, 2),
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 3,
                "action": "ingest_profile",
                "source": source,
                "row_count_raw": profile["row_count_raw"],
                "artifact_refs": {
                    "snapshot_id": artifact_ids["snapshot_id"],
                    "profile_id": artifact_ids["profile_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="transform_analyze")

        phase = "transform_analyze"
        transformed = _unwrap_tool_data(
            transform_sales_rows(
                rows=rows_raw,
                dedupe_key=dedupe_key,
                dedupe_ts_key=dedupe_ts_key,
                fill_missing_channel=fill_missing_channel,
                normalize_status=normalize_status,
            ),
            tool_name="transform_sales_rows",
        )
        rows_clean = list(transformed["rows"])

        metrics = _unwrap_tool_data(
            analyze_sales_rows(rows=rows_clean),
            tool_name="analyze_sales_rows",
        )
        gateway.validate_metrics(metrics=metrics)

        trace.append(
            {
                "step": 4,
                "phase": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "deduped_rows": transformed["deduped_rows"],
                "filled_missing_channel": transformed["filled_missing_channel"],
                "skipped_rows_missing_key": transformed.get("skipped_rows_missing_key", 0),
                "skipped_rows_invalid_ts": transformed.get("skipped_rows_invalid_ts", 0),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 4,
                "action": "transform_analyze",
                "row_count_clean": transformed["row_count_clean"],
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "applied_rules": {
                    "dedupe_strategy": dedupe_strategy,
                    "dedupe_key": dedupe_key,
                    "dedupe_ts_key": dedupe_ts_key,
                    "fill_missing_channel": fill_missing_channel,
                    "normalize_status": normalize_status,
                },
                "artifact_refs": {
                    "transform_id": artifact_ids["transform_id"],
                    "metrics_id": artifact_ids["metrics_id"],
                },
            }
        )

        if (time.monotonic() - started) > budget.max_seconds:
            return stopped("max_seconds", phase="validate")

        phase = "validate"
        quality = _unwrap_tool_data(
            validate_analysis(
                metrics=metrics,
                profile=profile,
                max_missing_channel_pct=max_missing_channel_pct,
            ),
            tool_name="validate_analysis",
        )
        if not bool(quality.get("ok")):
            failed = quality.get("failed_checks") or []
            first = str(failed[0]) if failed else "unknown"
            return stopped(f"validation_failed:{first}", phase=phase, quality=quality)

        trace.append(
            {
                "step": 5,
                "phase": "validate",
                "passed_checks": len(quality.get("passed_checks", [])),
                "failed_checks": len(quality.get("failed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 5,
                "action": "validate_analysis",
                "passed_checks": len(quality.get("passed_checks", [])),
                "artifact_refs": {"quality_id": artifact_ids["quality_id"]},
            }
        )

        aggregate = {
            "report_date": req["report_date"],
            "region": req["region"],
            "source": source,
            "metrics": {
                "sample_size": int(metrics["sample_size"]),
                "gross_revenue": round(float(metrics["gross_revenue"]), 2),
                "refund_amount_total": round(float(metrics["refund_amount_total"]), 2),
                "net_revenue": round(float(metrics["net_revenue"]), 2),
                "conversion_rate": round(float(metrics["conversion_rate"]), 6),
                "conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
                "failed_payment_rate": round(float(metrics["failed_payment_rate"]), 6),
                "failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
                "refund_rate": round(float(metrics["refund_rate"]), 6),
                "refund_rate_pct": round(float(metrics["refund_rate"]) * 100, 2),
                "top_channel": str(metrics["top_channel"]),
                "avg_latency_ms": round(float(metrics["avg_latency_ms"]), 2),
                "p95_latency_ms": round(float(metrics["p95_latency_ms"]), 2),
                "eta_minutes": int(metrics["eta_minutes"]),
            },
            "data_quality": {
                "row_count_raw": int(profile["row_count_raw"]),
                "row_count_clean": int(transformed["row_count_clean"]),
                "deduped_rows": int(transformed["deduped_rows"]),
                "missing_channel_pct_raw": round(float(profile["missing_channel_pct"]) * 100, 2),
                "row_validity_ok": bool(profile["row_validity_ok"]),
                "invalid_order_id_count": int(profile["invalid_order_id_count"]),
                "invalid_event_ts_count": int(profile["invalid_event_ts_count"]),
                "invalid_status_count": int(profile["invalid_status_count"]),
                "invalid_amount_count": int(profile["invalid_amount_count"]),
                "invalid_latency_count": int(profile["invalid_latency_count"]),
                "skipped_rows_missing_key": int(transformed.get("skipped_rows_missing_key", 0)),
                "skipped_rows_invalid_ts": int(transformed.get("skipped_rows_invalid_ts", 0)),
            },
        }

        answer = compose_final_answer(
            request=request,
            aggregate=aggregate,
            quality=quality,
        )

        trace.append(
            {
                "step": 6,
                "phase": "finalize",
                "elapsed_ms": elapsed_ms(),
                "ok": True,
            }
        )
        history.append(
            {
                "step": 6,
                "action": "finalize",
            }
        )

        return {
            "run_id": run_id,
            "status": "ok",
            "stop_reason": "success",
            "outcome": "validated_analysis",
            "answer": answer,
            "aggregate": aggregate,
            "quality": quality,
            "trace": trace,
            "history": history,
        }
    except StopRun as exc:
        return stopped(
            exc.reason,
            phase=phase,
            **({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
        )


def main() -> None:
    result = run_data_analysis_agent(goal=GOAL, request=REQUEST)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Lo mas importante aqui (en palabras simples)

  • el workflow esta forzado y fijo, por eso el agente no puede "saltar" profile/validate
  • policy boundary decide por separado si se puede usar la fuente en este run
  • profiling + validation dan un quality-gate tecnico, no "fe en el numero"; con invalid_profile:*, el run se detiene antes de transform
  • el dedup es determinista (latest_by_event_ts) para event_ts UTC estricto (YYYY-MM-DDTHH:MM:SSZ), y cleaning rules vienen explicitamente de policy_hints
  • deduped_rows en esta demo muestra filas eliminadas totales despues de transform (dedup + posible descarte de filas invalidas); en el sample actual no hay descartes
  • filled_missing_channel se cuenta despues de dedup: si un duplicado se reemplaza por una fila mas nueva con channel valido, no hace falta rellenar (por eso aqui es 1 y no 2)
  • finalize aqui no es paso de tool, sino paso de orquestacion en main.py
  • el brief final se forma solo con metricas validadas
  • trace/history con elapsed_ms y artifact_refs hacen el pipeline auditable

Ejemplo de Salida

JSON
{
  "run_id": "3045a7d0-1431-455b-9d26-16ae9fbc653a",
  "status": "ok",
  "stop_reason": "success",
  "outcome": "validated_analysis",
  "answer": "Data analysis brief (US, 2026-03-07): source warehouse_sales_daily shows net revenue 600.00 (gross 670.00, refunds 70.00 subtracted), conversion 63.64%, failed payment rate 18.18%, refund rate 18.18%, top channel paid_search, and p95 latency 240.00 ms. Quality checks passed: 12.",
  "aggregate": {
    "report_date": "2026-03-07",
    "region": "US",
    "source": "warehouse_sales_daily",
    "metrics": {
      "sample_size": 11,
      "gross_revenue": 670.0,
      "refund_amount_total": 70.0,
      "net_revenue": 600.0,
      "conversion_rate": 0.636364,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate": 0.181818,
      "failed_payment_rate_pct": 18.18,
      "refund_rate": 0.181818,
      "refund_rate_pct": 18.18,
      "top_channel": "paid_search",
      "avg_latency_ms": 200.73,
      "p95_latency_ms": 240.0,
      "eta_minutes": 45
    },
    "data_quality": {
      "row_count_raw": 12,
      "row_count_clean": 11,
      "deduped_rows": 1,
      "missing_channel_pct_raw": 16.67,
      "row_validity_ok": true,
      "invalid_order_id_count": 0,
      "invalid_event_ts_count": 0,
      "invalid_status_count": 0,
      "invalid_amount_count": 0,
      "invalid_latency_count": 0,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0
    }
  },
  "quality": {
    "ok": true,
    "passed_checks": [
      "conversion_between_0_1",
      "refund_rate_between_0_1",
      "gross_revenue_non_negative",
      "refund_amount_non_negative",
      "net_revenue_non_negative",
      "failed_rate_reasonable",
      "sample_size_min_10",
      "p95_latency_under_400",
      "missing_channel_within_policy",
      "gross_ge_net",
      "gross_minus_refunds_equals_net",
      "profile_row_values_valid"
    ],
    "failed_checks": []
  },
  "trace": [
    {
      "step": 1,
      "phase": "plan",
      "steps": 5,
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 2,
      "phase": "policy_check",
      "source": "warehouse_sales_daily",
      "region": "US",
      "decision": "allow",
      "reason": "policy_pass",
      "allowed_sources_policy": [
        "warehouse_refunds_daily",
        "warehouse_sales_daily"
      ],
      "allowed_sources_execution": [
        "warehouse_sales_daily"
      ],
      "elapsed_ms": 1,
      "ok": true
    },
    {
      "step": 3,
      "phase": "ingest_profile",
      "row_count_raw": 12,
      "duplicate_rows": 1,
      "missing_channel_pct": 16.67,
      "artifact_refs": {
        "snapshot_id": "snap_3045a7d0",
        "profile_id": "profile_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 4,
      "phase": "transform_analyze",
      "row_count_clean": 11,
      "deduped_rows": 1,
      "filled_missing_channel": 1,
      "skipped_rows_missing_key": 0,
      "skipped_rows_invalid_ts": 0,
      "net_revenue": 600.0,
      "conversion_rate_pct": 63.64,
      "failed_payment_rate_pct": 18.18,
      "artifact_refs": {
        "transform_id": "transform_3045a7d0",
        "metrics_id": "metrics_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 5,
      "phase": "validate",
      "passed_checks": 12,
      "failed_checks": 0,
      "artifact_refs": {
        "quality_id": "quality_3045a7d0"
      },
      "elapsed_ms": 2,
      "ok": true
    },
    {
      "step": 6,
      "phase": "finalize",
      "elapsed_ms": 2,
      "ok": true
    }
  ],
  "history": [{...}]
}

stop_reason Tipicos

  • success - el run termino correctamente
  • max_seconds - se agoto el time budget total
  • invalid_plan:* - plan de analisis invalido
  • invalid_plan:step_sequence - se rompio el orden obligatorio del workflow (con details expected/received)
  • invalid_step:* - contrato de step invalido
  • policy_block:source_denied_policy - fuente denegada por policy allowlist
  • policy_block:source_denied_execution - fuente denegada por execution allowlist
  • policy_block:region_denied_policy - region denegada por policy
  • dataset_too_large - se excedio el limite de filas
  • invalid_profile:schema - profile detecto schema mismatch
  • invalid_profile:row_values - profile detecto valores de fila invalidos
  • invalid_profile:missing_channel_too_high - la proporcion de canal faltante supero el threshold de policy
  • invalid_metrics:* - metrics no pasaron invariantes basicos
  • validation_failed:* - quality validation no paso
  • tool_invalid_output:* - el paso devolvio contrato invalido
  • invalid_request:dedupe_strategy - dedupe strategy no soportada en analysis_rules
  • invalid_request:max_rows - valor invalido de max_rows
  • invalid_request:max_missing_channel_pct - threshold invalido de max_missing_channel_pct

Lo Que NO Se Muestra Aqui

  • conexion a DWH/BI real en lugar de snapshot in-memory
  • versionado de datasets (snapshot lineage)
  • SQL pushdown y cost-aware query planning
  • multi-source joins con conflictos de schema
  • anomaly detection mas alla de invariantes simples
  • persisted artifacts (tablas/graficos) en almacenamiento

Que Probar Despues

  1. Pon source="warehouse_refunds_daily" y mira policy_block:source_denied_execution.
  2. Pon max_missing_channel_pct=0.1 y mira invalid_profile:missing_channel_too_high.
  3. Agrega en el plan un step que no este en EXPECTED_ACTION_SEQUENCE y mira invalid_plan:step_sequence.
  4. Aumenta rows por encima de max_rows y mira dataset_too_large.
  5. Devuelve en analyze conversion_rate=1.2 o gross < net y mira invalid_metrics:*.
⏱️ 20 min de lecturaActualizado 6 de marzo de 2026Dificultad: ★★☆
Integrado: control en producciónOnceOnly
Guardrails para agentes con tool-calling
Lleva este patrón a producción con gobernanza:
  • Presupuestos (pasos / topes de gasto)
  • Permisos de herramientas (allowlist / blocklist)
  • Kill switch y parada por incidente
  • Idempotencia y dedupe
  • Audit logs y trazabilidad
Mención integrada: OnceOnly es una capa de control para sistemas de agentes en producción.

Autor

Nick — ingeniero que construye infraestructura para agentes de IA en producción.

Enfoque: patrones de agentes, modos de fallo, control del runtime y fiabilidad del sistema.

🔗 GitHub: https://github.com/mykolademyanov


Nota editorial

Esta documentación está asistida por IA, con responsabilidad editorial humana sobre la exactitud, la claridad y la relevancia en producción.

El contenido se basa en fallos reales, post-mortems e incidentes operativos en sistemas de agentes de IA desplegados.