Pattern Essence (Brief)
Data-Analysis Agent means: the agent does not "invent" metrics but goes through a fixed workflow:
ingestprofiletransformanalyzevalidatefinalize(host orchestration step)
So the result depends on a controlled pipeline, not on free-form text reasoning.
What This Example Demonstrates
- The agent proposes an analysis plan, but runtime validates the plan contract and step order
- policy allowlist and execution allowlist for data source are handled separately
- workflow is strictly fixed: skipping
profile/validateis impossible - profile-check detects duplicates, missing values, and invalid row values; if quality-gate fails, pipeline stops before
transform - transform step applies deterministic dedup (
latest_by_event_ts), field normalization, and cleaning rules frompolicy_hints - cleaning rules and dedupe strategy come from
policy_hints(policy-configurable analysis) - analyze step computes metrics (
gross/net revenue,conversion,failure rate,refund rate,latency) - validate step checks quality invariants and aggregate consistency
trace/historyprovide auditability from plan to finalization
Architecture
agent.pyforms the analysis step plan.gateway.pyvalidates the plan and policy decision for ingest.tools.pyruns deterministic steps: ingest/profile/transform/analyze/validate.main.pyorchestrates workflow, runsfinalizeas a host step, collectsaggregate,trace/history, and returns final brief.
Project Structure
agent-patterns/
βββ data-analysis-agent/
βββ python/
βββ main.py
βββ gateway.py
βββ tools.py
βββ agent.py
βββ context.py
βββ README.md
βββ requirements.txt
How to Run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd agent-patterns/data-analysis-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
export variant:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Variant via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is a shell variant (macOS/Linux). On Windows, using env set variables is simpler or, if preferred, python-dotenv.
Task
Production case:
"Prepare a customer-safe analytics brief for the last 7 days in US: revenue, conversion, failure rate, data-quality checks."
Code
context.py β request envelope + sample dataset
from __future__ import annotations
from typing import Any
def build_request(*, report_date: str, region: str, source: str) -> dict[str, Any]:
rows = [
{
"order_id": "o-1001",
"event_ts": "2026-03-01T10:00:00Z",
"status": "paid",
"amount": 120.0,
"channel": "paid_search",
"latency_ms": 180,
},
{
"order_id": "o-1002",
"event_ts": "2026-03-01T10:05:00Z",
"status": "paid",
"amount": 90.0,
"channel": "email",
"latency_ms": 160,
},
{
"order_id": "o-1003",
"event_ts": "2026-03-01T10:10:00Z",
"status": "failed",
"amount": 0.0,
"channel": "paid_search",
"latency_ms": 240,
},
{
"order_id": "o-1004",
"event_ts": "2026-03-01T10:15:00Z",
"status": "refunded",
"amount": 40.0,
"channel": "email",
"latency_ms": 210,
},
{
"order_id": "o-1005",
"event_ts": "2026-03-01T10:20:00Z",
"status": "paid",
"amount": 140.0,
"channel": "paid_search",
"latency_ms": 170,
},
{
"order_id": "o-1006",
"event_ts": "2026-03-01T10:25:00Z",
"status": "paid",
"amount": 110.0,
"channel": None,
"latency_ms": 190,
},
{
"order_id": "o-1007",
"event_ts": "2026-03-01T10:30:00Z",
"status": "failed",
"amount": 0.0,
"channel": "organic",
"latency_ms": 260,
},
{
"order_id": "o-1008",
"event_ts": "2026-03-01T10:35:00Z",
"status": "paid",
"amount": 80.0,
"channel": "paid_social",
"latency_ms": 200,
},
{
"order_id": "o-1009",
"event_ts": "2026-03-01T10:40:00Z",
"status": "refunded",
"amount": 30.0,
"channel": "paid_social",
"latency_ms": 230,
},
{
"order_id": "o-1010",
"event_ts": "2026-03-01T10:45:00Z",
"status": "paid",
"amount": 70.0,
"channel": "paid_search",
"latency_ms": 175,
},
{
"order_id": "o-1006",
"event_ts": "2026-03-01T10:50:00Z",
"status": "paid",
"amount": 110.0,
"channel": "email",
"latency_ms": 188,
},
{
"order_id": "o-1011",
"event_ts": "2026-03-01T10:55:00Z",
"status": "paid",
"amount": 60.0,
"channel": None,
"latency_ms": 195,
},
]
return {
"request": {
"report_date": report_date,
"region": region.upper(),
"source": source,
"rows": rows,
},
"policy_hints": {
"allowed_sources": ["warehouse_sales_daily", "warehouse_refunds_daily"],
"allowed_regions": ["US", "CA"],
"max_rows": 5000,
"max_missing_channel_pct": 0.25,
"analysis_rules": {
"dedupe_key": "order_id",
"dedupe_ts_key": "event_ts",
"dedupe_strategy": "latest_by_event_ts",
"fill_missing_channel": "unknown",
"normalize_status": "lower_strip",
},
},
}
tools.py β deterministic analysis steps
from __future__ import annotations
import math
from datetime import datetime
from collections import defaultdict
from typing import Any
ALLOWED_STATUSES = {"paid", "failed", "refunded"}
def _safe_float(value: Any) -> tuple[bool, float]:
try:
v = float(value)
except (TypeError, ValueError):
return False, 0.0
if not math.isfinite(v):
return False, 0.0
return True, v
def _is_valid_event_ts(value: Any) -> bool:
if not isinstance(value, str):
return False
ts = value.strip()
if len(ts) != 20 or not ts.endswith("Z"):
return False
try:
datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
return False
return True
def read_sales_snapshot(*, source: str, region: str, rows: list[dict[str, Any]]) -> dict[str, Any]:
return {
"status": "ok",
"data": {
"source": source,
"region": region.upper(),
"rows": rows,
"snapshot_at": "2026-03-07T10:00:00Z",
},
}
def profile_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
required_fields = {"order_id", "event_ts", "status", "amount", "channel", "latency_ms"}
schema_ok = True
missing_fields_count = 0
missing_channel_count = 0
duplicate_rows = 0
by_order_id: dict[str, int] = {}
invalid_order_id_count = 0
invalid_event_ts_count = 0
invalid_status_count = 0
invalid_amount_count = 0
invalid_latency_count = 0
for row in rows:
keys = set(row.keys())
if not required_fields.issubset(keys):
schema_ok = False
missing_fields_count += 1
order_id = row.get("order_id")
order_id_norm = str(order_id).strip() if isinstance(order_id, str) else ""
if not order_id_norm:
invalid_order_id_count += 1
else:
by_order_id[order_id_norm] = by_order_id.get(order_id_norm, 0) + 1
if by_order_id[order_id_norm] > 1:
duplicate_rows += 1
event_ts = row.get("event_ts")
if not _is_valid_event_ts(event_ts):
invalid_event_ts_count += 1
status = str(row.get("status", "")).strip().lower()
if status not in ALLOWED_STATUSES:
invalid_status_count += 1
ok_amount, amount = _safe_float(row.get("amount"))
if not ok_amount or amount < 0.0:
invalid_amount_count += 1
ok_latency, latency = _safe_float(row.get("latency_ms"))
if not ok_latency or latency < 0.0:
invalid_latency_count += 1
channel = row.get("channel")
if not isinstance(channel, str) or not channel.strip():
missing_channel_count += 1
total = len(rows)
missing_channel_pct = (missing_channel_count / total) if total else 0.0
row_validity_ok = (
invalid_order_id_count == 0
and invalid_event_ts_count == 0
and invalid_status_count == 0
and invalid_amount_count == 0
and invalid_latency_count == 0
)
return {
"status": "ok",
"data": {
"row_count_raw": total,
"duplicate_rows": duplicate_rows,
"missing_channel_count": missing_channel_count,
"missing_channel_pct": missing_channel_pct,
"schema_ok": schema_ok,
"missing_fields_count": missing_fields_count,
"row_validity_ok": row_validity_ok,
"invalid_order_id_count": invalid_order_id_count,
"invalid_event_ts_count": invalid_event_ts_count,
"invalid_status_count": invalid_status_count,
"invalid_amount_count": invalid_amount_count,
"invalid_latency_count": invalid_latency_count,
},
}
def transform_sales_rows(
*,
rows: list[dict[str, Any]],
dedupe_key: str,
dedupe_ts_key: str,
fill_missing_channel: str,
normalize_status: str,
) -> dict[str, Any]:
latest_by_key: dict[str, dict[str, Any]] = {}
ts_by_key: dict[str, str] = {}
skipped_rows_missing_key = 0
skipped_rows_invalid_ts = 0
for row in rows:
key = str(row.get(dedupe_key, "")).strip()
if not key:
skipped_rows_missing_key += 1
continue
event_ts_raw = row.get(dedupe_ts_key)
if not _is_valid_event_ts(event_ts_raw):
skipped_rows_invalid_ts += 1
continue
event_ts = str(event_ts_raw).strip()
previous_ts = ts_by_key.get(key, "")
if key not in latest_by_key or event_ts >= previous_ts:
latest_by_key[key] = dict(row)
ts_by_key[key] = event_ts
cleaned_rows = [latest_by_key[k] for k in sorted(latest_by_key.keys())]
filled_missing_channel = 0
for row in cleaned_rows:
channel = row.get("channel")
if not isinstance(channel, str) or not channel.strip():
row["channel"] = fill_missing_channel
filled_missing_channel += 1
if normalize_status == "lower_strip":
row["status"] = str(row.get("status", "")).strip().lower()
row[dedupe_key] = str(row.get(dedupe_key, "")).strip()
row[dedupe_ts_key] = str(row.get(dedupe_ts_key, "")).strip()
ok_amount, amount = _safe_float(row.get("amount"))
ok_latency, latency = _safe_float(row.get("latency_ms"))
row["amount"] = amount if ok_amount else 0.0
row["latency_ms"] = latency if ok_latency else 0.0
return {
"status": "ok",
"data": {
"rows": cleaned_rows,
"row_count_clean": len(cleaned_rows),
"deduped_rows": max(0, len(rows) - len(cleaned_rows)),
"filled_missing_channel": filled_missing_channel,
"skipped_rows_missing_key": skipped_rows_missing_key,
"skipped_rows_invalid_ts": skipped_rows_invalid_ts,
"dedupe_strategy": "latest_by_event_ts",
"dedupe_key": dedupe_key,
"dedupe_ts_key": dedupe_ts_key,
},
}
def analyze_sales_rows(*, rows: list[dict[str, Any]]) -> dict[str, Any]:
total = len(rows)
paid_count = 0
failed_count = 0
refunded_count = 0
gross_revenue = 0.0
refund_amount_total = 0.0
latency_values: list[float] = []
channel_net_revenue: dict[str, float] = defaultdict(float)
for row in rows:
status = str(row.get("status", "")).lower()
amount = float(row.get("amount", 0.0))
channel = str(row.get("channel", "unknown")).strip().lower() or "unknown"
latency = float(row.get("latency_ms", 0.0))
latency_values.append(latency)
if status == "paid":
paid_count += 1
gross_revenue += amount
channel_net_revenue[channel] += amount
elif status == "failed":
failed_count += 1
elif status == "refunded":
refunded_count += 1
refund_amount_total += amount
channel_net_revenue[channel] -= amount
net_revenue = gross_revenue - refund_amount_total
conversion_rate = (paid_count / total) if total else 0.0
failed_payment_rate = (failed_count / total) if total else 0.0
refund_rate = (refunded_count / total) if total else 0.0
avg_latency_ms = (sum(latency_values) / len(latency_values)) if latency_values else 0.0
sorted_latency = sorted(latency_values)
if sorted_latency:
p95_idx = int((len(sorted_latency) - 1) * 0.95)
p95_latency_ms = sorted_latency[p95_idx]
else:
p95_latency_ms = 0.0
top_channel = "unknown"
top_channel_value = float("-inf")
for channel, value in channel_net_revenue.items():
if value > top_channel_value:
top_channel = channel
top_channel_value = value
eta_minutes = 45 if failed_payment_rate >= 0.15 else 20
return {
"status": "ok",
"data": {
"sample_size": total,
"paid_count": paid_count,
"failed_count": failed_count,
"refunded_count": refunded_count,
"gross_revenue": gross_revenue,
"refund_amount_total": refund_amount_total,
"net_revenue": net_revenue,
"conversion_rate": conversion_rate,
"failed_payment_rate": failed_payment_rate,
"refund_rate": refund_rate,
"avg_latency_ms": avg_latency_ms,
"p95_latency_ms": p95_latency_ms,
"top_channel": top_channel,
"eta_minutes": eta_minutes,
},
}
def validate_analysis(
*,
metrics: dict[str, Any],
profile: dict[str, Any],
max_missing_channel_pct: float,
) -> dict[str, Any]:
passed_checks: list[str] = []
failed_checks: list[str] = []
conversion_rate = float(metrics.get("conversion_rate", 0.0))
failed_payment_rate = float(metrics.get("failed_payment_rate", 0.0))
refund_rate = float(metrics.get("refund_rate", 0.0))
gross_revenue = float(metrics.get("gross_revenue", 0.0))
refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
net_revenue = float(metrics.get("net_revenue", 0.0))
sample_size = int(metrics.get("sample_size", 0))
p95_latency_ms = float(metrics.get("p95_latency_ms", 0.0))
missing_channel_pct = float(profile.get("missing_channel_pct", 0.0))
row_validity_ok = bool(profile.get("row_validity_ok"))
if 0.0 <= conversion_rate <= 1.0:
passed_checks.append("conversion_between_0_1")
else:
failed_checks.append("conversion_out_of_range")
if 0.0 <= refund_rate <= 1.0:
passed_checks.append("refund_rate_between_0_1")
else:
failed_checks.append("refund_rate_out_of_range")
if gross_revenue >= 0.0:
passed_checks.append("gross_revenue_non_negative")
else:
failed_checks.append("gross_revenue_negative")
if refund_amount_total >= 0.0:
passed_checks.append("refund_amount_non_negative")
else:
failed_checks.append("refund_amount_negative")
if net_revenue >= 0.0:
passed_checks.append("net_revenue_non_negative")
else:
failed_checks.append("net_revenue_negative")
if failed_payment_rate <= 0.35:
passed_checks.append("failed_rate_reasonable")
else:
failed_checks.append("failed_rate_too_high")
if sample_size >= 10:
passed_checks.append("sample_size_min_10")
else:
failed_checks.append("sample_size_too_low")
if p95_latency_ms <= 400.0:
passed_checks.append("p95_latency_under_400")
else:
failed_checks.append("p95_latency_too_high")
if missing_channel_pct <= max_missing_channel_pct:
passed_checks.append("missing_channel_within_policy")
else:
failed_checks.append("missing_channel_policy_violation")
if gross_revenue >= net_revenue:
passed_checks.append("gross_ge_net")
else:
failed_checks.append("gross_less_than_net")
if abs((gross_revenue - refund_amount_total) - net_revenue) <= 0.01:
passed_checks.append("gross_minus_refunds_equals_net")
else:
failed_checks.append("gross_refund_net_mismatch")
if row_validity_ok:
passed_checks.append("profile_row_values_valid")
else:
failed_checks.append("profile_row_values_invalid")
return {
"status": "ok",
"data": {
"ok": len(failed_checks) == 0,
"passed_checks": passed_checks,
"failed_checks": failed_checks,
},
}
agent.py β plan proposal + final answer
from __future__ import annotations
from typing import Any
def propose_analysis_plan(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
del goal
req = request["request"]
return {
"steps": [
{
"id": "s1",
"action": "ingest_sales",
"args": {
"source": req["source"],
"region": req["region"],
},
},
{"id": "s2", "action": "profile_sales", "args": {}},
{"id": "s3", "action": "transform_sales", "args": {}},
{"id": "s4", "action": "analyze_sales", "args": {}},
{"id": "s5", "action": "validate_analysis", "args": {}},
]
}
def compose_final_answer(
*,
request: dict[str, Any],
aggregate: dict[str, Any],
quality: dict[str, Any],
) -> str:
req = request["request"]
m = aggregate["metrics"]
checks = quality.get("passed_checks", [])
return (
f"Data analysis brief ({req['region']}, {req['report_date']}): source {req['source']} shows net revenue "
f"{m['net_revenue']:.2f} (gross {m['gross_revenue']:.2f}, refunds {m['refund_amount_total']:.2f} subtracted), "
f"conversion {m['conversion_rate_pct']:.2f}%, failed payment rate {m['failed_payment_rate_pct']:.2f}%, "
f"refund rate {m['refund_rate_pct']:.2f}%, top channel {m['top_channel']}, "
f"and p95 latency {m['p95_latency_ms']:.2f} ms. Quality checks passed: {len(checks)}."
)
gateway.py β plan/policy/quality boundaries
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str, *, details: dict[str, Any] | None = None):
super().__init__(reason)
self.reason = reason
self.details = details or {}
@dataclass(frozen=True)
class Budget:
max_seconds: int = 25
max_steps: int = 8
max_rows: int = 5000
@dataclass(frozen=True)
class Decision:
kind: str
reason: str
EXPECTED_ACTION_SEQUENCE = [
"ingest_sales",
"profile_sales",
"transform_sales",
"analyze_sales",
"validate_analysis",
]
def validate_plan(raw_steps: Any, *, max_steps: int) -> list[dict[str, Any]]:
if not isinstance(raw_steps, list) or not raw_steps:
raise StopRun("invalid_plan:steps")
if len(raw_steps) > max_steps:
raise StopRun("invalid_plan:too_many_steps")
steps: list[dict[str, Any]] = []
actions: list[str] = []
for raw in raw_steps:
if not isinstance(raw, dict):
raise StopRun("invalid_step:not_object")
step_id = raw.get("id")
action = raw.get("action")
args = raw.get("args")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun("invalid_step:id")
if not isinstance(action, str) or not action.strip():
raise StopRun("invalid_step:action")
if not isinstance(args, dict):
raise StopRun("invalid_step:args")
normalized = {
"id": step_id.strip(),
"action": action.strip(),
"args": dict(args),
}
steps.append(normalized)
actions.append(normalized["action"])
if actions != EXPECTED_ACTION_SEQUENCE:
raise StopRun(
"invalid_plan:step_sequence",
details={"expected": EXPECTED_ACTION_SEQUENCE, "received": actions},
)
return steps
class DataAnalysisGateway:
def __init__(
self,
*,
allowed_sources_policy: set[str],
allowed_sources_execution: set[str],
allowed_regions_policy: set[str],
budget: Budget,
):
self.allowed_sources_policy = set(allowed_sources_policy)
self.allowed_sources_execution = set(allowed_sources_execution)
self.allowed_regions_policy = {x.upper() for x in allowed_regions_policy}
self.budget = budget
def evaluate_ingest(self, *, source: str, region: str) -> Decision:
if source not in self.allowed_sources_policy:
return Decision(kind="deny", reason="source_denied_policy")
if region.upper() not in self.allowed_regions_policy:
return Decision(kind="deny", reason="region_denied_policy")
if source not in self.allowed_sources_execution:
return Decision(kind="deny", reason="source_denied_execution")
return Decision(kind="allow", reason="policy_pass")
def ensure_rows_budget(self, *, rows: list[dict[str, Any]]) -> None:
if len(rows) > self.budget.max_rows:
raise StopRun("dataset_too_large")
def validate_profile(self, *, profile: dict[str, Any], max_missing_channel_pct: float) -> None:
if not bool(profile.get("schema_ok")):
raise StopRun("invalid_profile:schema")
if not bool(profile.get("row_validity_ok")):
raise StopRun("invalid_profile:row_values")
missing_pct = float(profile.get("missing_channel_pct", 0.0))
if missing_pct > max_missing_channel_pct:
raise StopRun("invalid_profile:missing_channel_too_high")
def validate_metrics(self, *, metrics: dict[str, Any]) -> None:
sample_size = int(metrics.get("sample_size", 0))
if sample_size <= 0:
raise StopRun("invalid_metrics:sample_size")
conversion = float(metrics.get("conversion_rate", 0.0))
if not (0.0 <= conversion <= 1.0):
raise StopRun("invalid_metrics:conversion_rate")
failed_rate = float(metrics.get("failed_payment_rate", 0.0))
if not (0.0 <= failed_rate <= 1.0):
raise StopRun("invalid_metrics:failed_payment_rate")
refund_rate = float(metrics.get("refund_rate", 0.0))
if not (0.0 <= refund_rate <= 1.0):
raise StopRun("invalid_metrics:refund_rate")
gross_revenue = float(metrics.get("gross_revenue", 0.0))
refund_amount_total = float(metrics.get("refund_amount_total", 0.0))
net_revenue = float(metrics.get("net_revenue", 0.0))
if net_revenue < 0:
raise StopRun("invalid_metrics:net_revenue")
if gross_revenue < net_revenue:
raise StopRun("invalid_metrics:gross_less_than_net")
if abs((gross_revenue - refund_amount_total) - net_revenue) > 0.01:
raise StopRun("invalid_metrics:revenue_consistency")
main.py β orchestrate analysis workflow
from __future__ import annotations
import json
import time
import uuid
from typing import Any
from agent import compose_final_answer, propose_analysis_plan
from context import build_request
from gateway import Budget, DataAnalysisGateway, StopRun, validate_plan
from tools import (
analyze_sales_rows,
profile_sales_rows,
read_sales_snapshot,
transform_sales_rows,
validate_analysis,
)
GOAL = (
"Prepare a validated weekly data-analysis brief for US payments with revenue, "
"conversion, failed payment rate, and quality checks."
)
REQUEST = build_request(
report_date="2026-03-07",
region="US",
source="warehouse_sales_daily",
)
DEFAULT_BUDGET = Budget(
max_seconds=25,
max_steps=8,
max_rows=5000,
)
DEFAULT_ALLOWED_SOURCES_POLICY = {"warehouse_sales_daily", "warehouse_refunds_daily"}
ALLOWED_SOURCES_EXECUTION = {"warehouse_sales_daily"}
DEFAULT_ALLOWED_REGIONS_POLICY = {"US", "CA"}
DEFAULT_ANALYSIS_RULES = {
"dedupe_key": "order_id",
"dedupe_ts_key": "event_ts",
"dedupe_strategy": "latest_by_event_ts",
"fill_missing_channel": "unknown",
"normalize_status": "lower_strip",
}
def _unwrap_tool_data(raw: Any, *, tool_name: str) -> dict[str, Any]:
if not isinstance(raw, dict) or raw.get("status") != "ok" or not isinstance(raw.get("data"), dict):
raise StopRun(f"tool_invalid_output:{tool_name}")
return dict(raw["data"])
def run_data_analysis_agent(*, goal: str, request: dict[str, Any]) -> dict[str, Any]:
run_id = str(uuid.uuid4())
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
artifact_ids = {
"snapshot_id": f"snap_{run_id[:8]}",
"profile_id": f"profile_{run_id[:8]}",
"transform_id": f"transform_{run_id[:8]}",
"metrics_id": f"metrics_{run_id[:8]}",
"quality_id": f"quality_{run_id[:8]}",
}
def elapsed_ms() -> int:
return max(1, int((time.monotonic() - started) * 1000))
hints_raw = request.get("policy_hints")
hints: dict[str, Any] = hints_raw if isinstance(hints_raw, dict) else {}
allowed_sources_raw = hints.get("allowed_sources")
if isinstance(allowed_sources_raw, list):
allowed_sources_policy = {
str(item).strip()
for item in allowed_sources_raw
if isinstance(item, str) and item.strip()
}
else:
allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
if not allowed_sources_policy:
allowed_sources_policy = set(DEFAULT_ALLOWED_SOURCES_POLICY)
allowed_regions_raw = hints.get("allowed_regions")
if isinstance(allowed_regions_raw, list):
allowed_regions_policy = {
str(item).strip().upper()
for item in allowed_regions_raw
if isinstance(item, str) and item.strip()
}
else:
allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
if not allowed_regions_policy:
allowed_regions_policy = set(DEFAULT_ALLOWED_REGIONS_POLICY)
max_rows_raw = hints.get("max_rows", DEFAULT_BUDGET.max_rows)
max_missing_channel_pct_raw = hints.get("max_missing_channel_pct", 0.25)
analysis_rules_raw = hints.get("analysis_rules")
analysis_rules = dict(DEFAULT_ANALYSIS_RULES)
if isinstance(analysis_rules_raw, dict):
for key, value in analysis_rules_raw.items():
if isinstance(key, str):
analysis_rules[key] = value
dedupe_key = str(analysis_rules.get("dedupe_key", DEFAULT_ANALYSIS_RULES["dedupe_key"])).strip() or "order_id"
dedupe_ts_key = str(analysis_rules.get("dedupe_ts_key", DEFAULT_ANALYSIS_RULES["dedupe_ts_key"])).strip() or "event_ts"
dedupe_strategy = str(
analysis_rules.get("dedupe_strategy", DEFAULT_ANALYSIS_RULES["dedupe_strategy"])
).strip() or "latest_by_event_ts"
fill_missing_channel = str(
analysis_rules.get("fill_missing_channel", DEFAULT_ANALYSIS_RULES["fill_missing_channel"])
).strip() or "unknown"
normalize_status = str(
analysis_rules.get("normalize_status", DEFAULT_ANALYSIS_RULES["normalize_status"])
).strip() or "lower_strip"
if dedupe_strategy != "latest_by_event_ts":
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "invalid_request:dedupe_strategy",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
max_rows = int(max_rows_raw)
except (TypeError, ValueError):
max_rows = DEFAULT_BUDGET.max_rows
if max_rows <= 0:
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "invalid_request:max_rows",
"phase": "plan",
"trace": trace,
"history": history,
}
try:
max_missing_channel_pct = float(max_missing_channel_pct_raw)
except (TypeError, ValueError):
max_missing_channel_pct = 0.25
if not (0.0 <= max_missing_channel_pct <= 1.0):
return {
"run_id": run_id,
"status": "stopped",
"stop_reason": "invalid_request:max_missing_channel_pct",
"phase": "plan",
"trace": trace,
"history": history,
}
budget = Budget(
max_seconds=DEFAULT_BUDGET.max_seconds,
max_steps=DEFAULT_BUDGET.max_steps,
max_rows=max(10, min(200000, max_rows)),
)
gateway = DataAnalysisGateway(
allowed_sources_policy=allowed_sources_policy,
allowed_sources_execution=ALLOWED_SOURCES_EXECUTION,
allowed_regions_policy=allowed_regions_policy,
budget=budget,
)
def stopped(stop_reason: str, *, phase: str, **extra: Any) -> dict[str, Any]:
payload = {
"run_id": run_id,
"status": "stopped",
"stop_reason": stop_reason,
"phase": phase,
"trace": trace,
"history": history,
}
payload.update(extra)
return payload
phase = "plan"
try:
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase=phase)
raw_plan = propose_analysis_plan(goal=goal, request=request)
steps = validate_plan(raw_plan.get("steps"), max_steps=budget.max_steps)
trace.append(
{
"step": 1,
"phase": "plan",
"steps": len(steps),
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 1,
"action": "propose_analysis_plan",
"step_ids": [item["id"] for item in steps],
}
)
phase = "policy_check"
ingest_args = steps[0]["args"]
source = str(ingest_args.get("source", "")).strip()
region = str(ingest_args.get("region", "")).strip().upper()
decision = gateway.evaluate_ingest(source=source, region=region)
trace.append(
{
"step": 2,
"phase": "policy_check",
"source": source,
"region": region,
"decision": decision.kind,
"reason": decision.reason,
"allowed_sources_policy": sorted(allowed_sources_policy),
"allowed_sources_execution": sorted(ALLOWED_SOURCES_EXECUTION),
"elapsed_ms": elapsed_ms(),
"ok": decision.kind == "allow",
}
)
history.append(
{
"step": 2,
"action": "policy_check",
"decision": {"kind": decision.kind, "reason": decision.reason},
}
)
if decision.kind != "allow":
return stopped(f"policy_block:{decision.reason}", phase=phase)
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase="ingest_profile")
phase = "ingest_profile"
req = request["request"]
snapshot = _unwrap_tool_data(
read_sales_snapshot(
source=source,
region=region,
rows=list(req["rows"]),
),
tool_name="read_sales_snapshot",
)
rows_raw = list(snapshot["rows"])
gateway.ensure_rows_budget(rows=rows_raw)
profile = _unwrap_tool_data(
profile_sales_rows(rows=rows_raw),
tool_name="profile_sales_rows",
)
gateway.validate_profile(profile=profile, max_missing_channel_pct=max_missing_channel_pct)
trace.append(
{
"step": 3,
"phase": "ingest_profile",
"row_count_raw": profile["row_count_raw"],
"duplicate_rows": profile["duplicate_rows"],
"missing_channel_pct": round(float(profile["missing_channel_pct"]) * 100, 2),
"artifact_refs": {
"snapshot_id": artifact_ids["snapshot_id"],
"profile_id": artifact_ids["profile_id"],
},
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 3,
"action": "ingest_profile",
"source": source,
"row_count_raw": profile["row_count_raw"],
"artifact_refs": {
"snapshot_id": artifact_ids["snapshot_id"],
"profile_id": artifact_ids["profile_id"],
},
}
)
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase="transform_analyze")
phase = "transform_analyze"
transformed = _unwrap_tool_data(
transform_sales_rows(
rows=rows_raw,
dedupe_key=dedupe_key,
dedupe_ts_key=dedupe_ts_key,
fill_missing_channel=fill_missing_channel,
normalize_status=normalize_status,
),
tool_name="transform_sales_rows",
)
rows_clean = list(transformed["rows"])
metrics = _unwrap_tool_data(
analyze_sales_rows(rows=rows_clean),
tool_name="analyze_sales_rows",
)
gateway.validate_metrics(metrics=metrics)
trace.append(
{
"step": 4,
"phase": "transform_analyze",
"row_count_clean": transformed["row_count_clean"],
"deduped_rows": transformed["deduped_rows"],
"filled_missing_channel": transformed["filled_missing_channel"],
"skipped_rows_missing_key": transformed.get("skipped_rows_missing_key", 0),
"skipped_rows_invalid_ts": transformed.get("skipped_rows_invalid_ts", 0),
"net_revenue": round(float(metrics["net_revenue"]), 2),
"conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
"failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
"artifact_refs": {
"transform_id": artifact_ids["transform_id"],
"metrics_id": artifact_ids["metrics_id"],
},
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 4,
"action": "transform_analyze",
"row_count_clean": transformed["row_count_clean"],
"net_revenue": round(float(metrics["net_revenue"]), 2),
"applied_rules": {
"dedupe_strategy": dedupe_strategy,
"dedupe_key": dedupe_key,
"dedupe_ts_key": dedupe_ts_key,
"fill_missing_channel": fill_missing_channel,
"normalize_status": normalize_status,
},
"artifact_refs": {
"transform_id": artifact_ids["transform_id"],
"metrics_id": artifact_ids["metrics_id"],
},
}
)
if (time.monotonic() - started) > budget.max_seconds:
return stopped("max_seconds", phase="validate")
phase = "validate"
quality = _unwrap_tool_data(
validate_analysis(
metrics=metrics,
profile=profile,
max_missing_channel_pct=max_missing_channel_pct,
),
tool_name="validate_analysis",
)
if not bool(quality.get("ok")):
failed = quality.get("failed_checks") or []
first = str(failed[0]) if failed else "unknown"
return stopped(f"validation_failed:{first}", phase=phase, quality=quality)
trace.append(
{
"step": 5,
"phase": "validate",
"passed_checks": len(quality.get("passed_checks", [])),
"failed_checks": len(quality.get("failed_checks", [])),
"artifact_refs": {"quality_id": artifact_ids["quality_id"]},
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 5,
"action": "validate_analysis",
"passed_checks": len(quality.get("passed_checks", [])),
"artifact_refs": {"quality_id": artifact_ids["quality_id"]},
}
)
aggregate = {
"report_date": req["report_date"],
"region": req["region"],
"source": source,
"metrics": {
"sample_size": int(metrics["sample_size"]),
"gross_revenue": round(float(metrics["gross_revenue"]), 2),
"refund_amount_total": round(float(metrics["refund_amount_total"]), 2),
"net_revenue": round(float(metrics["net_revenue"]), 2),
"conversion_rate": round(float(metrics["conversion_rate"]), 6),
"conversion_rate_pct": round(float(metrics["conversion_rate"]) * 100, 2),
"failed_payment_rate": round(float(metrics["failed_payment_rate"]), 6),
"failed_payment_rate_pct": round(float(metrics["failed_payment_rate"]) * 100, 2),
"refund_rate": round(float(metrics["refund_rate"]), 6),
"refund_rate_pct": round(float(metrics["refund_rate"]) * 100, 2),
"top_channel": str(metrics["top_channel"]),
"avg_latency_ms": round(float(metrics["avg_latency_ms"]), 2),
"p95_latency_ms": round(float(metrics["p95_latency_ms"]), 2),
"eta_minutes": int(metrics["eta_minutes"]),
},
"data_quality": {
"row_count_raw": int(profile["row_count_raw"]),
"row_count_clean": int(transformed["row_count_clean"]),
"deduped_rows": int(transformed["deduped_rows"]),
"missing_channel_pct_raw": round(float(profile["missing_channel_pct"]) * 100, 2),
"row_validity_ok": bool(profile["row_validity_ok"]),
"invalid_order_id_count": int(profile["invalid_order_id_count"]),
"invalid_event_ts_count": int(profile["invalid_event_ts_count"]),
"invalid_status_count": int(profile["invalid_status_count"]),
"invalid_amount_count": int(profile["invalid_amount_count"]),
"invalid_latency_count": int(profile["invalid_latency_count"]),
"skipped_rows_missing_key": int(transformed.get("skipped_rows_missing_key", 0)),
"skipped_rows_invalid_ts": int(transformed.get("skipped_rows_invalid_ts", 0)),
},
}
answer = compose_final_answer(
request=request,
aggregate=aggregate,
quality=quality,
)
trace.append(
{
"step": 6,
"phase": "finalize",
"elapsed_ms": elapsed_ms(),
"ok": True,
}
)
history.append(
{
"step": 6,
"action": "finalize",
}
)
return {
"run_id": run_id,
"status": "ok",
"stop_reason": "success",
"outcome": "validated_analysis",
"answer": answer,
"aggregate": aggregate,
"quality": quality,
"trace": trace,
"history": history,
}
except StopRun as exc:
return stopped(
exc.reason,
phase=phase,
**({"details": exc.details} if isinstance(exc.details, dict) and exc.details else {}),
)
def main() -> None:
result = run_data_analysis_agent(goal=GOAL, request=REQUEST)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (plain words)
- workflow is forcibly fixed, so the agent cannot "jump over" profile/validate
- policy boundary separately decides whether the source can be used in this run
- profiling + validation provide a technical quality-gate, not "faith in a number"; with
invalid_profile:*, run stops before transform - dedup is deterministic (
latest_by_event_ts) for strict UTCevent_ts(YYYY-MM-DDTHH:MM:SSZ), and cleaning rules explicitly come frompolicy_hints deduped_rowsin this demo shows total removed rows after transform (dedup + possible drop of invalid rows); in current sample there are no dropped rowsfilled_missing_channelis counted after dedup: if duplicate is replaced by a newer row with validchannel, no fill is needed (that is why it is1, not2)finalizehere is not a tool step but an orchestration step inmain.py- final brief is formed only from validated metrics
trace/historywithelapsed_msandartifact_refsmake the pipeline auditable
Example Output
{
"run_id": "3045a7d0-1431-455b-9d26-16ae9fbc653a",
"status": "ok",
"stop_reason": "success",
"outcome": "validated_analysis",
"answer": "Data analysis brief (US, 2026-03-07): source warehouse_sales_daily shows net revenue 600.00 (gross 670.00, refunds 70.00 subtracted), conversion 63.64%, failed payment rate 18.18%, refund rate 18.18%, top channel paid_search, and p95 latency 240.00 ms. Quality checks passed: 12.",
"aggregate": {
"report_date": "2026-03-07",
"region": "US",
"source": "warehouse_sales_daily",
"metrics": {
"sample_size": 11,
"gross_revenue": 670.0,
"refund_amount_total": 70.0,
"net_revenue": 600.0,
"conversion_rate": 0.636364,
"conversion_rate_pct": 63.64,
"failed_payment_rate": 0.181818,
"failed_payment_rate_pct": 18.18,
"refund_rate": 0.181818,
"refund_rate_pct": 18.18,
"top_channel": "paid_search",
"avg_latency_ms": 200.73,
"p95_latency_ms": 240.0,
"eta_minutes": 45
},
"data_quality": {
"row_count_raw": 12,
"row_count_clean": 11,
"deduped_rows": 1,
"missing_channel_pct_raw": 16.67,
"row_validity_ok": true,
"invalid_order_id_count": 0,
"invalid_event_ts_count": 0,
"invalid_status_count": 0,
"invalid_amount_count": 0,
"invalid_latency_count": 0,
"skipped_rows_missing_key": 0,
"skipped_rows_invalid_ts": 0
}
},
"quality": {
"ok": true,
"passed_checks": [
"conversion_between_0_1",
"refund_rate_between_0_1",
"gross_revenue_non_negative",
"refund_amount_non_negative",
"net_revenue_non_negative",
"failed_rate_reasonable",
"sample_size_min_10",
"p95_latency_under_400",
"missing_channel_within_policy",
"gross_ge_net",
"gross_minus_refunds_equals_net",
"profile_row_values_valid"
],
"failed_checks": []
},
"trace": [
{
"step": 1,
"phase": "plan",
"steps": 5,
"elapsed_ms": 1,
"ok": true
},
{
"step": 2,
"phase": "policy_check",
"source": "warehouse_sales_daily",
"region": "US",
"decision": "allow",
"reason": "policy_pass",
"allowed_sources_policy": [
"warehouse_refunds_daily",
"warehouse_sales_daily"
],
"allowed_sources_execution": [
"warehouse_sales_daily"
],
"elapsed_ms": 1,
"ok": true
},
{
"step": 3,
"phase": "ingest_profile",
"row_count_raw": 12,
"duplicate_rows": 1,
"missing_channel_pct": 16.67,
"artifact_refs": {
"snapshot_id": "snap_3045a7d0",
"profile_id": "profile_3045a7d0"
},
"elapsed_ms": 2,
"ok": true
},
{
"step": 4,
"phase": "transform_analyze",
"row_count_clean": 11,
"deduped_rows": 1,
"filled_missing_channel": 1,
"skipped_rows_missing_key": 0,
"skipped_rows_invalid_ts": 0,
"net_revenue": 600.0,
"conversion_rate_pct": 63.64,
"failed_payment_rate_pct": 18.18,
"artifact_refs": {
"transform_id": "transform_3045a7d0",
"metrics_id": "metrics_3045a7d0"
},
"elapsed_ms": 2,
"ok": true
},
{
"step": 5,
"phase": "validate",
"passed_checks": 12,
"failed_checks": 0,
"artifact_refs": {
"quality_id": "quality_3045a7d0"
},
"elapsed_ms": 2,
"ok": true
},
{
"step": 6,
"phase": "finalize",
"elapsed_ms": 2,
"ok": true
}
],
"history": [{...}]
}
Typical stop_reason
success- run completed correctlymax_seconds- total time budget exhaustedinvalid_plan:*- invalid analysis planinvalid_plan:step_sequence- required workflow order violated (with expected/received details)invalid_step:*- invalid step contractpolicy_block:source_denied_policy- source denied by policy allowlistpolicy_block:source_denied_execution- source denied by execution allowlistpolicy_block:region_denied_policy- region denied by policydataset_too_large- row limit exceededinvalid_profile:schema- profile found schema mismatchinvalid_profile:row_values- profile found invalid row valuesinvalid_profile:missing_channel_too_high- share of missing channel exceeded policy thresholdinvalid_metrics:*- metrics failed basic invariantsvalidation_failed:*- quality validation failedtool_invalid_output:*- step returned invalid contractinvalid_request:dedupe_strategy- unsupported dedupe strategy inanalysis_rulesinvalid_request:max_rows- invalidmax_rowsvalueinvalid_request:max_missing_channel_pct- invalidmax_missing_channel_pctthreshold
What Is NOT Shown Here
- connection to real DWH/BI instead of in-memory snapshot
- dataset versioning (snapshot lineage)
- SQL pushdown and cost-aware query planning
- multi-source joins with schema conflicts
- anomaly detection beyond simple invariants
- persisted artifacts (tables/charts) into storage
What to Try Next
- Set
source="warehouse_refunds_daily"and observepolicy_block:source_denied_execution. - Set
max_missing_channel_pct=0.1and observeinvalid_profile:missing_channel_too_high. - Add a step not in
EXPECTED_ACTION_SEQUENCEand observeinvalid_plan:step_sequence. - Increase
rowsabovemax_rowsand observedataset_too_large. - Return
conversion_rate=1.2orgross < netin analyze step and observeinvalid_metrics:*.