Pattern Essence (Brief)
Supervisor Agent is a pattern where a worker agent proposes the next action, and a separate supervisor layer decides whether it can be executed.
LLM decides what to do next, and supervisor policy decides whether it is safe and allowed before execution.
What this example demonstrates
- worker loop where LLM proposes an action (
toolorfinal) - separate supervisor review before each tool call
- supervisor decisions:
approve,revise,block,escalate - human approval simulation for high refund amounts
- policy boundary between intent (LLM) and execution (tools)
- tool allowlist, run budgets, and loop detection
- explicit
stop_reasonand audit trace for production monitoring
Architecture
- Worker proposes action JSON.
- Action is validated (
validate_worker_action). - Supervisor performs review and returns a decision (
approve/revise/block/escalate). - ToolGateway executes only the allowed action.
- Observation + supervisor decision are written to
history. - Worker sees
historyand takes the next step or returnsfinal.
Supervisor does not execute business logic itself: it only decides whether the next step is admissible.
Project structure
examples/
└── agent-patterns/
└── supervisor-agent/
└── python/
├── main.py # Worker -> Supervisor Review -> Execute -> Final
├── llm.py # worker decision + final synthesis
├── supervisor.py # policy review + human approval simulation
├── gateway.py # action validation + allowlist + loop/budget control
├── tools.py # deterministic business tools (refund flow)
└── requirements.txt
How to run
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/supervisor-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ is required.
Option via export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Option via .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
This is the shell variant (macOS/Linux). On Windows, it is easier to use environment set commands or, if desired, python-dotenv to load .env automatically.
Task
Imagine a real production support case:
"A customer requests a 1200 USD refund for an annual plan paid 10 days ago."
The worker must not automatically execute a refund of that size. It must:
- gather context
- propose an action
- let the supervisor check it against policies
- escalate to a human when needed
- execute only the approved action
Solution
In this example:
- the worker LLM proposes a sequence of actions
- the supervisor controls every tool call
- a high-risk refund is escalated to a human
- a human (simulation) approves the adjusted amount
- only then do execution and the final response happen
This is the Supervisor pattern: execution cannot bypass policy review.
Code
tools.py — business tools (facts and execution only)
from __future__ import annotations
from typing import Any
USERS = {
42: {"id": 42, "name": "Anna", "country": "US", "tier": "enterprise"},
7: {"id": 7, "name": "Max", "country": "US", "tier": "pro"},
}
BILLING = {
42: {
"plan": "annual_enterprise",
"currency": "USD",
"last_charge_usd": 1200.0,
"days_since_payment": 10,
},
7: {
"plan": "pro_monthly",
"currency": "USD",
"last_charge_usd": 49.0,
"days_since_payment": 35,
},
}
def get_refund_context(user_id: int) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"error": f"context_not_found_for_user:{user_id}"}
return {
"user": user,
"billing": billing,
"policy_hint": {
"auto_refund_limit_usd": 1000.0,
"refund_window_days": 14,
},
}
def issue_refund(
user_id: int,
amount_usd: float,
reason: str,
) -> dict[str, Any]:
user = USERS.get(user_id)
billing = BILLING.get(user_id)
if not user or not billing:
return {"status": "error", "error": f"refund_user_not_found:{user_id}"}
if amount_usd <= 0:
return {"status": "error", "error": "refund_amount_must_be_positive"}
paid = float(billing["last_charge_usd"])
if amount_usd > paid:
return {
"status": "error",
"error": "refund_exceeds_last_charge",
"max_refund_usd": paid,
}
return {
"status": "ok",
"refund": {
"user_id": user_id,
"currency": "USD",
"amount_usd": round(float(amount_usd), 2),
"reason": reason.strip(),
"transaction_id": f"rf_{user_id}_20260226",
},
}
def send_refund_email(user_id: int, amount_usd: float, message: str) -> dict[str, Any]:
user = USERS.get(user_id)
if not user:
return {"status": "error", "error": f"email_user_not_found:{user_id}"}
return {
"status": "ok",
"email": {
"to": f"{user['name'].lower()}@example.com",
"template": "refund_confirmation_v2",
"amount_usd": round(float(amount_usd), 2),
"message": message,
"email_id": f"em_{user_id}_20260226",
},
}
What matters most here (in plain words)
- Tools do not make policy decisions — they only return facts or execute an action.
- Safety constraints here are minimal; the supervisor performs the primary control.
gateway.py — execution boundary (validation and control)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_steps: int = 8
max_tool_calls: int = 5
max_seconds: int = 30
TOOL_ARG_TYPES: dict[str, dict[str, str]] = {
"get_refund_context": {"user_id": "int"},
"issue_refund": {"user_id": "int", "amount_usd": "number", "reason": "str?"},
"send_refund_email": {"user_id": "int", "amount_usd": "number", "message": "str"},
}
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(v) for v in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key]))
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def _normalize_for_hash(value: Any) -> Any:
if isinstance(value, str):
return " ".join(value.strip().split())
if isinstance(value, list):
return [_normalize_for_hash(item) for item in value]
if isinstance(value, dict):
return {str(key): _normalize_for_hash(value[key]) for key in sorted(value)}
return value
def args_hash(args: dict[str, Any]) -> str:
normalized = _normalize_for_hash(args or {})
raw = _stable_json(normalized)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def _validate_tool_args(name: str, args: dict[str, Any]) -> dict[str, Any]:
spec = TOOL_ARG_TYPES.get(name)
if spec is None:
raise StopRun(f"invalid_action:unknown_tool:{name}")
extra = set(args.keys()) - set(spec.keys())
if extra:
raise StopRun(f"invalid_action:extra_tool_args:{name}")
normalized: dict[str, Any] = {}
for arg_name, expected in spec.items():
is_optional = expected.endswith("?")
expected_base = expected[:-1] if is_optional else expected
if arg_name not in args:
if is_optional:
continue
raise StopRun(f"invalid_action:missing_required_arg:{name}:{arg_name}")
value = args[arg_name]
if expected_base == "int":
if not isinstance(value, int) or isinstance(value, bool):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value
continue
if expected_base == "number":
if not _is_number(value):
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = float(value)
continue
if expected_base == "str":
if not isinstance(value, str) or not value.strip():
raise StopRun(f"invalid_action:bad_arg_type:{name}:{arg_name}")
normalized[arg_name] = value.strip()
continue
raise StopRun(f"invalid_action:unknown_arg_spec:{name}:{arg_name}")
return normalized
def validate_worker_action(action: Any) -> dict[str, Any]:
if not isinstance(action, dict):
raise StopRun("invalid_action:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_action:non_json")
if kind == "final":
allowed_keys = {"kind", "answer"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_final")
answer = action.get("answer")
if not isinstance(answer, str) or not answer.strip():
raise StopRun("invalid_action:bad_final_answer")
return {"kind": "final", "answer": answer.strip()}
if kind == "tool":
allowed_keys = {"kind", "name", "args"}
if set(action.keys()) - allowed_keys:
raise StopRun("invalid_action:extra_keys_tool")
name = action.get("name")
if not isinstance(name, str) or not name.strip():
raise StopRun("invalid_action:bad_tool_name")
args = action.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun("invalid_action:bad_tool_args")
normalized_args = _validate_tool_args(name.strip(), args)
return {"kind": "tool", "name": name.strip(), "args": normalized_args}
raise StopRun("invalid_action:bad_kind")
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_call_counts: dict[str, int] = {}
self.per_tool_counts: dict[str, int] = {}
self.read_only_repeat_limit: dict[str, int] = {
"get_refund_context": 2,
}
self.per_tool_limit: dict[str, int] = {
"get_refund_context": 3,
"issue_refund": 2,
"send_refund_email": 2,
}
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
fn = self.registry.get(name)
if fn is None:
raise StopRun(f"tool_missing:{name}")
count_for_tool = self.per_tool_counts.get(name, 0) + 1
if count_for_tool > self.per_tool_limit.get(name, 2):
raise StopRun("loop_detected:per_tool_limit")
self.per_tool_counts[name] = count_for_tool
signature = f"{name}:{args_hash(args)}"
seen = self.seen_call_counts.get(signature, 0) + 1
allowed_repeats = self.read_only_repeat_limit.get(name, 1)
if seen > allowed_repeats:
raise StopRun("loop_detected:signature_repeat")
self.seen_call_counts[signature] = seen
try:
out = fn(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
if not isinstance(out, dict):
raise StopRun(f"tool_bad_result:{name}")
return out
What matters most here (in plain words)
- Gateway performs a minimal args schema-check (types, required + optional fields) before execution.
- Gateway controls technical execution: allowlist, budget, loop detection (both by
tool+argsand by the total number of tool calls). - Gateway does not fill missing fields — it only validates the args contract.
- For some fields (for example
issue_refund.reason), gateway allows optional because supervisor can add them viarevise. If supervisor does not add them, execution stops withtool_bad_args:*. - Even if supervisor approved, execution still goes only through the controlled layer.
supervisor.py — policy review and human approval
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
HUMAN_APPROVAL_CAP_USD = 800.0
@dataclass(frozen=True)
class Policy:
auto_refund_limit_usd: float = 1000.0
max_refund_per_run_usd: float = 2000.0
@dataclass
class RuntimeState:
executed_refund_total_usd: float = 0.0
refund_executed: bool = False
has_context: bool = False
@dataclass
class Decision:
kind: str # approve | revise | block | escalate
reason: str
revised_action: dict[str, Any] | None = None
def review_action(action: dict[str, Any], state: RuntimeState, policy: Policy) -> Decision:
kind = action.get("kind")
if kind == "final":
if not state.has_context:
return Decision(kind="block", reason="final_requires_context")
return Decision(kind="approve", reason="final_with_context")
if kind != "tool":
return Decision(kind="block", reason="unknown_action_kind")
name = action.get("name")
args = dict(action.get("args") or {})
if name == "get_refund_context":
return Decision(kind="approve", reason="read_only_context")
if name == "issue_refund":
try:
amount = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return Decision(kind="block", reason="invalid_refund_amount_type")
if amount <= 0:
return Decision(kind="block", reason="invalid_refund_amount")
remaining = policy.max_refund_per_run_usd - state.executed_refund_total_usd
if remaining <= 0:
return Decision(kind="block", reason="refund_budget_exhausted")
if amount > remaining:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "amount_usd": round(remaining, 2)},
}
return Decision(kind="revise", reason="cap_to_remaining_run_budget", revised_action=revised)
reason = str(args.get("reason", "")).strip()
if not reason:
revised = {
"kind": "tool",
"name": name,
"args": {**args, "reason": "Customer requested refund within policy review"},
}
return Decision(kind="revise", reason="refund_reason_required", revised_action=revised)
if amount > policy.auto_refund_limit_usd:
return Decision(kind="escalate", reason="high_refund_requires_human")
return Decision(kind="approve", reason="refund_within_auto_limit")
if name == "send_refund_email":
if not state.refund_executed:
return Decision(kind="block", reason="email_before_refund")
return Decision(kind="approve", reason="email_after_refund")
return Decision(kind="block", reason=f"unknown_tool_for_supervisor:{name}")
def simulate_human_approval(action: dict[str, Any]) -> dict[str, Any]:
name = action.get("name")
args = dict(action.get("args") or {})
if name != "issue_refund":
return {"approved": True, "revised_action": action, "comment": "approved_by_human"}
try:
requested = float(args.get("amount_usd", 0.0))
except (TypeError, ValueError):
return {"approved": False, "comment": "invalid_requested_amount_type"}
approved_amount = min(requested, HUMAN_APPROVAL_CAP_USD)
if approved_amount <= 0:
return {"approved": False, "comment": "invalid_requested_amount"}
revised_action = {
"kind": "tool",
"name": "issue_refund",
"args": {**args, "amount_usd": round(approved_amount, 2)},
}
return {
"approved": True,
"revised_action": revised_action,
"comment": f"approved_with_cap:{approved_amount}",
}
What matters most here (in plain words)
- Supervisor makes decisions before execution, not after.
finalalso goes through supervisor review (without context it returnsblock).- An invalid
amount_usdtype does not crash with an exception; it returns a controlledblockdecision. - Supervisor can
reviseto add policy-required fields (for example,reasonforissue_refund). escalatehere demonstrates a practical option: a human can approve the action with an adjusted amount.
llm.py — worker decisions + final synthesis
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
WORKER_SYSTEM_PROMPT = """
You are a support worker agent.
Return exactly one JSON object in one of these shapes:
1) {"kind":"tool","name":"<tool_name>","args":{...}}
2) {"kind":"final","answer":"<short final answer>"}
Rules:
- First, collect facts with get_refund_context.
- Then propose issue_refund when facts are sufficient.
- Use send_refund_email only after refund success is visible in history.
- Use only tools from available_tools.
- Do not output markdown or extra keys.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a customer support assistant.
Write a short final answer in English.
Include:
- final refund amount in USD
- whether human approval was required
- one reason based on policy
""".strip()
TOOL_CATALOG = [
{
"name": "get_refund_context",
"description": "Get user profile, billing facts, and policy hints",
"args": {"user_id": "integer"},
},
{
"name": "issue_refund",
"description": "Issue refund in USD",
"args": {"user_id": "integer", "amount_usd": "number", "reason": "optional string"},
},
{
"name": "send_refund_email",
"description": "Send refund confirmation email",
"args": {"user_id": "integer", "amount_usd": "number", "message": "string"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _build_state_summary(history: list[dict[str, Any]]) -> dict[str, Any]:
tools_used = [
step.get("action", {}).get("name")
for step in history
if isinstance(step, dict)
and isinstance(step.get("action"), dict)
and step.get("action", {}).get("kind") == "tool"
]
supervisor_decisions = [
step.get("supervisor", {}).get("decision")
for step in history
if isinstance(step, dict) and isinstance(step.get("supervisor"), dict)
]
return {
"steps_completed": len(history),
"tools_used": [t for t in tools_used if t],
"supervisor_decisions": [d for d in supervisor_decisions if d],
"last_step": history[-1] if history else None,
}
def _recent_step_summaries(history: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for step in history[-limit:]:
out.append(
{
"step": step.get("step"),
"proposed_tool": step.get("action", {}).get("name"),
"supervisor_decision": step.get("supervisor", {}).get("decision"),
"executed_tool": step.get("executed_action", {}).get("name"),
"executed_from": step.get("executed_from"),
"observation_status": step.get("observation", {}).get("status"),
}
)
return out
def decide_next_action(
goal: str,
history: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"goal": goal,
"state_summary": _build_state_summary(history),
"recent_step_summaries": _recent_step_summaries(history, limit=3),
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": WORKER_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
What matters most here (in plain words)
- Worker makes decisions but has no right to execute an action directly.
- Worker context includes supervisor decisions and compact summaries of recent steps without "fat" payload.
main.py — Worker -> Supervisor -> Execute -> Final
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_worker_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, decide_next_action
from supervisor import Decision, Policy, RuntimeState, review_action, simulate_human_approval
from tools import get_refund_context, issue_refund, send_refund_email
GOAL = (
"User Anna (user_id=42) requests a refund for a recent annual plan charge of 1200 USD. "
"Apply supervisor policy before any refund execution and provide a short final response."
)
BUDGET = Budget(max_steps=8, max_tool_calls=5, max_seconds=30)
POLICY = Policy(
auto_refund_limit_usd=1000.0,
max_refund_per_run_usd=2000.0,
)
TOOL_REGISTRY = {
"get_refund_context": get_refund_context,
"issue_refund": issue_refund,
"send_refund_email": send_refund_email,
}
ALLOWED_TOOLS = {"get_refund_context", "issue_refund", "send_refund_email"}
def _decision_payload(decision: Decision) -> dict[str, Any]:
payload = {
"decision": decision.kind,
"reason": decision.reason,
}
if decision.revised_action:
payload["revised_action"] = decision.revised_action
return payload
def run_supervised_flow(goal: str) -> dict[str, Any]:
started = time.monotonic()
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
state = RuntimeState()
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
for step in range(1, BUDGET.max_steps + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
try:
raw_action = decide_next_action(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "worker",
"trace": trace,
"history": history,
}
try:
action = validate_worker_action(raw_action)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "worker",
"raw_action": raw_action,
"trace": trace,
"history": history,
}
decision = review_action(action=action, state=state, policy=POLICY)
supervisor_info = _decision_payload(decision)
action_to_execute = action
human_info: dict[str, Any] | None = None
executed_from = "original"
if decision.kind == "block":
trace_row = {
"step": step,
"tool": action.get("name", "final"),
"supervisor_decision": "block",
"executed_from": executed_from,
"stop_reason": f"supervisor_block:{decision.reason}",
"ok": False,
}
if action.get("kind") == "tool":
trace_row["args_hash"] = args_hash(action.get("args", {}))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": f"supervisor_block:{decision.reason}",
"phase": "supervisor",
"action": action,
"trace": trace,
"history": history,
}
if decision.kind == "revise" and decision.revised_action:
action_to_execute = decision.revised_action
executed_from = "supervisor_revised"
if decision.kind == "escalate":
human = simulate_human_approval(action)
human_info = human
if not human.get("approved"):
trace.append(
{
"step": step,
"tool": action["name"],
"args_hash": args_hash(action["args"]),
"supervisor_decision": "escalate",
"executed_from": executed_from,
"human_approved": False,
"stop_reason": "human_rejected",
"ok": False,
}
)
return {
"status": "stopped",
"stop_reason": "human_rejected",
"phase": "human_approval",
"action": action,
"trace": trace,
"history": history,
}
revised = human.get("revised_action")
if isinstance(revised, dict):
action_to_execute = revised
executed_from = "human_revised"
if action_to_execute["kind"] == "final":
trace.append(
{
"step": step,
"tool": "final",
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
)
history.append(
{
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": {"status": "final"},
}
)
return {
"status": "ok",
"stop_reason": "success",
"answer": action_to_execute["answer"],
"trace": trace,
"history": history,
}
tool_name = action_to_execute["name"]
tool_args = action_to_execute["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": True,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
except StopRun as exc:
trace_row = {
"step": step,
"tool": tool_name,
"args_hash": args_hash(tool_args),
"supervisor_decision": decision.kind,
"executed_from": executed_from,
"ok": False,
"stop_reason": exc.reason,
}
if human_info is not None:
trace_row["human_approved"] = bool(human_info.get("approved"))
trace.append(trace_row)
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": "execution",
"action": action_to_execute,
"trace": trace,
"history": history,
}
if tool_name == "get_refund_context" and "error" not in observation:
state.has_context = True
if tool_name == "issue_refund" and observation.get("status") == "ok":
amount = float(observation.get("refund", {}).get("amount_usd", 0.0))
state.executed_refund_total_usd += amount
state.refund_executed = True
history_entry = {
"step": step,
"action": action,
"supervisor": supervisor_info,
"executed_action": action_to_execute,
"executed_from": executed_from,
"observation": observation,
}
if human_info is not None:
history_entry["human_approval"] = human_info
history.append(history_entry)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_supervised_flow(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
What matters most here (in plain words)
- Worker cannot bypass supervisor review: every tool call passes policy check.
escalateleads to human approval, and only after that is the action executed.- In
trace, you can see the source of the executed action:original,supervisor_revised, orhuman_revised. - Early
finalwithoutget_refund_contextis blocked assupervisor_block:final_requires_context. trace+historyprovide a full audit: what worker proposed, what supervisor decided, and what was actually executed.
requirements.txt
openai==2.21.0
Example output
Below is an example of a valid run where the worker, right after gathering context, proposes a partial 1000 USD refund, and supervisor gives approve.
{
"status": "ok",
"stop_reason": "success",
"answer": "A partial refund of 1000 USD has been approved and processed for Anna as per supervisor policy, and a confirmation email has been sent.",
"trace": [
{
"step": 1,
"tool": "get_refund_context",
"args_hash": "feaa769a39ae",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 2,
"tool": "issue_refund",
"args_hash": "36ffe69cb606",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 3,
"tool": "send_refund_email",
"args_hash": "ff6ec44bb2fa",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
},
{
"step": 4,
"tool": "final",
"supervisor_decision": "approve",
"executed_from": "original",
"ok": true
}
],
"history": [
{"step": 1, "action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "supervisor": {"decision": "approve", "reason": "read_only_context"}, "executed_action": {"kind": "tool", "name": "get_refund_context", "args": {"user_id": 42}}, "executed_from": "original", "observation": {...}},
{"step": 2, "action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "supervisor": {"decision": "approve", "reason": "refund_within_auto_limit"}, "executed_action": {"kind": "tool", "name": "issue_refund", "args": {"user_id": 42, "amount_usd": 1000.0, "reason": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 3, "action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "supervisor": {"decision": "approve", "reason": "email_after_refund"}, "executed_action": {"kind": "tool", "name": "send_refund_email", "args": {"user_id": 42, "amount_usd": 1000.0, "message": "..."}}, "executed_from": "original", "observation": {...}},
{"step": 4, "action": {"kind": "final", "answer": "..."}, "supervisor": {"decision": "approve", "reason": "final_with_context"}, "executed_action": {"kind": "final", "answer": "..."}, "executed_from": "original", "observation": {"status": "final"}}
]
}
This is a shortened example: history is intentionally trimmed to key fields for readability.
What is NOT shown here
- There is no real integrated human-in-the-loop UI/queue.
- There is no role-based access or multi-tenant isolation.
- There are no advanced retry/backoff policies for LLM.
- There is no full retry/backoff orchestration for write-tools; the loop guard here is intentionally conservative.
- Optional args (for example
issue_refund.reason) are filled via supervisorrevisewithin the demo contract. - There are no token/cost budgets (cost guardrails).
Typical stop_reason values
success— worker completed the scenario and returned the final responseinvalid_action:*— worker returned invalid action JSONinvalid_action:bad_arg_type:*— tool args contain a value with an invalid type (for example,amount_usdis not a number)supervisor_block:*— supervisor blocked the action by policyhuman_rejected— escalation to a human was rejectedmax_tool_calls— tool call limit is exhaustedmax_seconds— run time budget is exceededllm_timeout— LLM did not respond withinOPENAI_TIMEOUT_SECONDSllm_empty— final LLM response is emptytool_denied:*— tool is not in the execution allowlisttool_missing:*— tool is missing in the registrytool_bad_args:*— invalid arguments for toolloop_detected:per_tool_limit—per_tool_limitexceeded for a tool (protection from tool spam even with different args)loop_detected:signature_repeat— the sametool+argsrepeated beyond the allowed repeat limit
What to try next
- Reduce
POLICY.auto_refund_limit_usdto500and observe howescalatetriggers more often. - Set
simulate_human_approvalto reject mode and verifyhuman_rejected. - Remove
send_refund_emailfromALLOWED_TOOLSand verifytool_denied:*. - Remove
reasonfromissue_refundand verify how supervisor returnsrevisewith auto-filled reason. - Add a
risk_scorefield to supervisor decisions and output it intracefor alerting.