Суть патерна (коротко)
Task Decomposition Agent — це патерн, у якому агент спершу розбиває складну задачу на послідовні кроки, а вже потім виконує їх по черзі.
Модель відповідає за план і порядок дій, а виконання кожного кроку проходить через контрольований gateway з валідацією плану, allowlist і бюджетами run.
Що демонструє цей приклад
- окремий етап
PlanпередExecute - policy boundary між planning (LLM) і tools (execution layer)
- strict validation плану (
kind, структура кроків, allowed keys) - allowlist інструментів (deny by default)
- окремі бюджети run:
max_plan_steps(план) іmax_execute_steps(виконання), плюсmax_tool_calls,max_seconds - явні
stop_reasonдля дебагу та моніторингу raw_planу відповіді, якщо план некоректний
Архітектура
- LLM отримує goal і повертає план у JSON (
kind="plan",steps). - Policy boundary валідовує план і блокує некоректні/небезпечні форми.
- Кожен крок виконується послідовно через
ToolGateway(allowlist, budgets, loop detection). - Observation кожного кроку додається в
historyяк checkpoint для прозорого виконання. - Після виконання всіх кроків LLM робить фінальну синтезу на основі
historyокремимCombine-викликом без tools.
LLM повертає intent (план), який розглядається як недовірений input: policy boundary спершу валідовує його, а потім (якщо дозволено) викликає tools.
Allowlist застосовується двічі: у plan validation (invalid_plan:tool_not_allowed:*) і у виконанні tools (tool_denied:*).
Так Task Decomposition залишається керованим: агент планує, а виконання проходить через контрольований шар.
Структура проєкту
examples/
└── agent-patterns/
└── task-decomposition-agent/
└── python/
├── main.py # Plan -> Execute -> Combine
├── llm.py # planner + final synthesis
├── gateway.py # policy boundary: plan validation + tool execution control
├── tools.py # deterministic tools (Anna/Max, US, USD)
└── requirements.txt
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/task-decomposition-agent/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.
Задача
Уяви, що керівник просить:
"Зроби короткий звіт за квітень 2026: продажі, повернення, чистий дохід і ризики."
Агент не має вигадувати відповідь "з голови". Він має:
- спочатку скласти план
- виконати кроки по черзі
- брати дані тільки з дозволених tools
- дати фінальну відповідь лише після всіх кроків
Рішення
Тут агент працює просто:
- LLM спочатку робить план із кількох кроків
- система перевіряє, що план правильний і дозволений
- tools виконують кроки та повертають факти
- після цього LLM збирає фінальний короткий підсумок
- якщо план або крок некоректний, запуск зупиняється з причиною
Код
tools.py — інструменти (джерело фактів)
from __future__ import annotations
from typing import Any
MANAGERS = {
42: {"id": 42, "name": "Anna", "region": "US", "team": "Retail East"},
7: {"id": 7, "name": "Max", "region": "US", "team": "Retail West"},
}
SALES_DATA = {
"2026-04": [
{"day": "2026-04-01", "gross_usd": 5200.0, "orders": 120},
{"day": "2026-04-02", "gross_usd": 4890.0, "orders": 113},
{"day": "2026-04-03", "gross_usd": 6105.0, "orders": 141},
{"day": "2026-04-04", "gross_usd": 5580.0, "orders": 127},
{"day": "2026-04-05", "gross_usd": 6420.0, "orders": 149},
]
}
REFUND_DATA = {
"2026-04": [
{"day": "2026-04-01", "refunds_usd": 140.0},
{"day": "2026-04-02", "refunds_usd": 260.0},
{"day": "2026-04-03", "refunds_usd": 210.0},
{"day": "2026-04-04", "refunds_usd": 590.0},
{"day": "2026-04-05", "refunds_usd": 170.0},
]
}
def get_manager_profile(manager_id: int) -> dict[str, Any]:
manager = MANAGERS.get(manager_id)
if not manager:
return {"error": f"manager {manager_id} not found"}
return {"manager": manager}
def fetch_sales_data(month: str) -> dict[str, Any]:
rows = SALES_DATA.get(month)
if not rows:
return {"error": f"sales data for {month} not found"}
return {"month": month, "currency": "USD", "daily_sales": rows}
def fetch_refund_data(month: str) -> dict[str, Any]:
rows = REFUND_DATA.get(month)
if not rows:
return {"error": f"refund data for {month} not found"}
return {"month": month, "currency": "USD", "daily_refunds": rows}
def calculate_monthly_kpis(month: str) -> dict[str, Any]:
sales_rows = SALES_DATA.get(month)
refund_rows = REFUND_DATA.get(month)
if not sales_rows or not refund_rows:
return {"error": f"kpi inputs for {month} not found"}
gross_sales = sum(row["gross_usd"] for row in sales_rows)
total_refunds = sum(row["refunds_usd"] for row in refund_rows)
total_orders = sum(row["orders"] for row in sales_rows)
net_sales = gross_sales - total_refunds
refund_rate = (total_refunds / gross_sales) if gross_sales else 0.0
top_day = max(sales_rows, key=lambda row: row["gross_usd"])["day"]
return {
"month": month,
"currency": "USD",
"gross_sales_usd": round(gross_sales, 2),
"refunds_usd": round(total_refunds, 2),
"net_sales_usd": round(net_sales, 2),
"orders": total_orders,
"refund_rate": round(refund_rate, 4),
"top_sales_day": top_day,
}
def detect_risk_signals(month: str) -> dict[str, Any]:
refund_rows = REFUND_DATA.get(month)
if not refund_rows:
return {"error": f"refund data for {month} not found"}
high_refund_day = max(refund_rows, key=lambda row: row["refunds_usd"])
warnings: list[str] = []
if high_refund_day["refunds_usd"] >= 500:
warnings.append(
f"Refund spike detected on {high_refund_day['day']}: {high_refund_day['refunds_usd']} USD"
)
if not warnings:
warnings.append("No critical risk signals detected for this month.")
return {
"month": month,
"currency": "USD",
"risk_warnings": warnings,
"peak_refund_day": high_refund_day,
}
Що тут найважливіше (простими словами)
- Tools детерміновані та не містять LLM-логіки.
- Агент лише вирішує, які кроки виконати.
- Бізнес-логіку виконує саме execution-layer (tools), а не LLM.
gateway.py — policy boundary (найважливіший шар)
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Callable
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_plan_steps: int = 6
max_execute_steps: int = 8
max_tool_calls: int = 8
max_seconds: int = 60
def _stable_json(value: Any) -> str:
if value is None or isinstance(value, (bool, int, float, str)):
return json.dumps(value, ensure_ascii=True, sort_keys=True)
if isinstance(value, list):
return "[" + ",".join(_stable_json(item) for item in value) + "]"
if isinstance(value, dict):
parts = []
for key in sorted(value):
parts.append(
json.dumps(str(key), ensure_ascii=True) + ":" + _stable_json(value[key])
)
return "{" + ",".join(parts) + "}"
return json.dumps(str(value), ensure_ascii=True)
def args_hash(args: dict[str, Any]) -> str:
raw = _stable_json(args or {})
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
def validate_plan_action(
action: Any, *, max_plan_steps: int, allowed_tools: set[str]
) -> list[dict[str, Any]]:
if not isinstance(action, dict):
raise StopRun("invalid_plan:not_object")
kind = action.get("kind")
if kind == "invalid":
raise StopRun("invalid_plan:non_json")
if kind != "plan":
raise StopRun("invalid_plan:bad_kind")
allowed_top_keys = {"kind", "steps"}
if set(action.keys()) - allowed_top_keys:
raise StopRun("invalid_plan:extra_keys")
steps = action.get("steps")
if not isinstance(steps, list) or not steps:
raise StopRun("invalid_plan:missing_steps")
if len(steps) < 3:
raise StopRun("invalid_plan:min_steps")
if len(steps) > max_plan_steps:
raise StopRun("invalid_plan:max_steps")
normalized: list[dict[str, Any]] = []
seen_ids: set[str] = set()
for index, step in enumerate(steps, start=1):
if not isinstance(step, dict):
raise StopRun(f"invalid_plan:step_{index}_not_object")
allowed_step_keys = {"id", "title", "tool", "args"}
if set(step.keys()) - allowed_step_keys:
raise StopRun(f"invalid_plan:step_{index}_extra_keys")
step_id = step.get("id")
if not isinstance(step_id, str) or not step_id.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_id")
if step_id in seen_ids:
raise StopRun("invalid_plan:duplicate_step_id")
seen_ids.add(step_id)
title = step.get("title")
if not isinstance(title, str) or not title.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_title")
tool = step.get("tool")
if not isinstance(tool, str) or not tool.strip():
raise StopRun(f"invalid_plan:step_{index}_missing_tool")
tool = tool.strip()
if tool not in allowed_tools:
raise StopRun(f"invalid_plan:tool_not_allowed:{tool}")
args = step.get("args", {})
if args is None:
args = {}
if not isinstance(args, dict):
raise StopRun(f"invalid_plan:step_{index}_bad_args")
normalized.append(
{
"id": step_id.strip(),
"title": title.strip(),
"tool": tool,
"args": args,
}
)
return normalized
class ToolGateway:
def __init__(
self,
*,
allow: set[str],
registry: dict[str, Callable[..., dict[str, Any]]],
budget: Budget,
):
self.allow = set(allow)
self.registry = registry
self.budget = budget
self.tool_calls = 0
self.seen_calls: set[str] = set()
def call(self, name: str, args: dict[str, Any]) -> dict[str, Any]:
self.tool_calls += 1
if self.tool_calls > self.budget.max_tool_calls:
raise StopRun("max_tool_calls")
if name not in self.allow:
raise StopRun(f"tool_denied:{name}")
tool = self.registry.get(name)
if tool is None:
raise StopRun(f"tool_missing:{name}")
signature = f"{name}:{args_hash(args)}"
if signature in self.seen_calls:
raise StopRun("loop_detected")
self.seen_calls.add(signature)
try:
return tool(**args)
except TypeError as exc:
raise StopRun(f"tool_bad_args:{name}") from exc
except Exception as exc:
raise StopRun(f"tool_error:{name}") from exc
Що тут найважливіше (простими словами)
validate_plan_action(...)— governance/control layer для плану від LLM.- План обробляється як недовірений input і проходить strict validation.
ToolGateway.call(...)— межаagent ≠ executor: агент планує, gateway безпечно виконує.loop_detectedловить exact-repeat (tool + args_hash).
llm.py — planning + final synthesis
LLM бачить лише каталог доступних tools; якщо tool не в allowlist, gateway зупинить run.
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
PLAN_SYSTEM_PROMPT = """
You are a task decomposition planner.
Return only one JSON object in this exact shape:
{
"kind": "plan",
"steps": [
{"id": "step_1", "title": "...", "tool": "...", "args": {...}}
]
}
Rules:
- Create 3 to 6 steps.
- Use only tools from available_tools.
- Keep args minimal and valid.
- Do not add extra keys.
- Do not output markdown.
""".strip()
FINAL_SYSTEM_PROMPT = """
You are a reporting assistant.
Write a short final summary in English for a US business audience.
Include: manager name, month, gross sales (USD), refunds (USD), net sales (USD), refund rate (%), and key risk note.
""".strip()
TOOL_CATALOG = [
{
"name": "get_manager_profile",
"description": "Get manager profile by manager_id",
"args": {"manager_id": "integer"},
},
{
"name": "fetch_sales_data",
"description": "Get daily gross sales for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "fetch_refund_data",
"description": "Get daily refund values for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "calculate_monthly_kpis",
"description": "Calculate gross/refunds/net/order KPIs for a month",
"args": {"month": "string in YYYY-MM"},
},
{
"name": "detect_risk_signals",
"description": "Detect risk warnings for a month",
"args": {"month": "string in YYYY-MM"},
},
]
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def create_plan(goal: str, max_plan_steps: int) -> dict[str, Any]:
payload = {
"goal": goal,
"max_plan_steps": max_plan_steps,
"available_tools": TOOL_CATALOG,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": PLAN_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"kind": "invalid", "raw": text}
def compose_final_answer(goal: str, history: list[dict[str, Any]]) -> str:
payload = {
"goal": goal,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or ""
text = text.strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Що тут найважливіше (простими словами)
create_plan(...)— це decision-stage для decomposition.timeout=LLM_TIMEOUT_SECONDS+LLMTimeoutдають керовану зупинку при проблемах мережі/моделі.- Порожня фінальна відповідь не маскується fallback-текстом: повертається явний
llm_empty. - Якщо JSON зламаний, повертається
{"kind":"invalid"...}, а policy layer дає читабельнийstop_reason.
main.py — Plan -> Execute -> Combine
from __future__ import annotations
import json
import time
from typing import Any
from gateway import Budget, StopRun, ToolGateway, args_hash, validate_plan_action
from llm import LLMEmpty, LLMTimeout, compose_final_answer, create_plan
from tools import (
calculate_monthly_kpis,
detect_risk_signals,
fetch_refund_data,
fetch_sales_data,
get_manager_profile,
)
GOAL = (
"Prepare an April 2026 monthly sales summary for manager_id=42 in USD. "
"Use step-by-step decomposition. Include gross sales, refunds, net sales, refund rate, and one risk note."
)
# max_execute_steps here limits plan length (number of planned steps), not runtime loop iterations.
BUDGET = Budget(max_plan_steps=6, max_execute_steps=8, max_tool_calls=8, max_seconds=60)
TOOL_REGISTRY = {
"get_manager_profile": get_manager_profile,
"fetch_sales_data": fetch_sales_data,
"fetch_refund_data": fetch_refund_data,
"calculate_monthly_kpis": calculate_monthly_kpis,
"detect_risk_signals": detect_risk_signals,
}
ALLOWED_TOOLS = {
"get_manager_profile",
"fetch_sales_data",
"fetch_refund_data",
"calculate_monthly_kpis",
"detect_risk_signals",
}
def run_task_decomposition(goal: str) -> dict[str, Any]:
started = time.monotonic()
trace: list[dict[str, Any]] = []
history: list[dict[str, Any]] = []
gateway = ToolGateway(allow=ALLOWED_TOOLS, registry=TOOL_REGISTRY, budget=BUDGET)
try:
raw_plan = create_plan(goal=goal, max_plan_steps=BUDGET.max_plan_steps)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "plan",
"trace": trace,
"history": history,
}
try:
steps = validate_plan_action(
raw_plan,
max_plan_steps=BUDGET.max_plan_steps,
allowed_tools=ALLOWED_TOOLS,
)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"raw_plan": raw_plan,
"trace": trace,
"history": history,
}
if len(steps) > BUDGET.max_execute_steps:
return {
"status": "stopped",
"stop_reason": "max_execute_steps",
"plan": steps,
"trace": trace,
"history": history,
}
for step_no, step in enumerate(steps, start=1):
elapsed = time.monotonic() - started
if elapsed > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"plan": steps,
"trace": trace,
"history": history,
}
tool_name = step["tool"]
tool_args = step["args"]
try:
observation = gateway.call(tool_name, tool_args)
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": True,
}
)
except StopRun as exc:
trace.append(
{
"step_no": step_no,
"step_id": step["id"],
"tool": tool_name,
"args_hash": args_hash(tool_args),
"ok": False,
"stop_reason": exc.reason,
}
)
return {
"status": "stopped",
"stop_reason": exc.reason,
"plan": steps,
"trace": trace,
"history": history,
}
history.append(
{
"step_no": step_no,
"plan_step": step,
"observation": observation,
}
)
try:
answer = compose_final_answer(goal=goal, history=history)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"llm_phase": "finalize",
"plan": steps,
"trace": trace,
"history": history,
}
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"plan": steps,
"trace": trace,
"history": history,
}
def main() -> None:
result = run_task_decomposition(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
run_task_decomposition(...)керуєPlan -> Execute -> Combine; бізнес-дії виконуються тільки черезToolGateway.- При некоректному плані повертається
raw_planдля дебагу. - У цій версії
max_execute_stepsперевіряє довжину плану перед execution; runtime-обмеження далі даютьmax_tool_callsіmax_seconds. history— це прозорий лог кроків: що було в плані і який observation повернув кожен tool.
requirements.txt
openai==2.21.0
Приклад виводу
Порядок кроків у плані може трохи відрізнятись між запусками, але policy-гейти і stop reasons залишаються стабільними. Planner може змінювати порядок кроків; важливо, що policy + allowlist однаково працюють незалежно від порядку.
{
"status": "ok",
"stop_reason": "success",
"answer": "In April 2026, under manager Anna's leadership, gross sales were $28,195, refunds were $1,370, net sales were $26,825, and refund rate was 4.86%.",
"plan": [
{"id": "step_1", "tool": "fetch_sales_data", "args": {"month": "2026-04"}},
{"id": "step_2", "tool": "fetch_refund_data", "args": {"month": "2026-04"}},
{"id": "step_3", "tool": "calculate_monthly_kpis", "args": {"month": "2026-04"}},
{"id": "step_4", "tool": "detect_risk_signals", "args": {"month": "2026-04"}},
{"id": "step_5", "tool": "get_manager_profile", "args": {"manager_id": 42}}
],
"trace": [
{"step_no": 1, "step_id": "step_1", "tool": "fetch_sales_data", "args_hash": "...", "ok": true},
{"step_no": 2, "step_id": "step_2", "tool": "fetch_refund_data", "args_hash": "...", "ok": true},
{"step_no": 3, "step_id": "step_3", "tool": "calculate_monthly_kpis", "args_hash": "...", "ok": true},
{"step_no": 4, "step_id": "step_4", "tool": "detect_risk_signals", "args_hash": "...", "ok": true},
{"step_no": 5, "step_id": "step_5", "tool": "get_manager_profile", "args_hash": "...", "ok": true}
],
"history": [{...}]
}
Це скорочений приклад: у реальному запуску plan і trace можуть містити більше кроків.
history — це журнал виконання: для кожного step_no є plan_step і observation.
args_hash — це hash тільки аргументів, тому він може збігатися між різними tools, якщо args однакові; loop detection додатково враховує назву tool.
Типові stop_reason
success— план виконано і фінальна відповідь згенерованаinvalid_plan:*— план від LLM не пройшов policy validationinvalid_plan:non_json— LLM не повернув валідний JSON-планinvalid_plan:min_steps— у плані менше 3 кроків для decompositioninvalid_plan:tool_not_allowed:<name>— план містить tool поза allowlistmax_execute_steps— план довший за дозволений execution budgetmax_tool_calls— вичерпано ліміт викликів toolsmax_seconds— перевищено time budget runllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_empty— LLM повернув порожню фінальну відповідь на етапіfinalizetool_denied:<name>— tool не в allowlisttool_missing:<name>— tool відсутній у registrytool_bad_args:<name>— крок містить некоректні аргументиloop_detected— exact repeat (tool + args_hash)
Що тут НЕ показано
- Немає auth/PII та продакшен-контролів доступу до персональних даних.
- Немає retry/backoff політик для LLM і tool-layer.
- Немає бюджетів по токенах/вартості (cost guardrails).
- Інструменти тут детерміновані mocks для навчання, а не реальні зовнішні API.
Що спробувати далі
- Прибери
detect_risk_signalsзALLOWED_TOOLSі перевірtool_denied:*. - Додай у план неіснуючий tool і перевір
tool_missing:*. - Зменьши
max_plan_stepsдо3і подивись, як часто отримаєшinvalid_plan:max_steps. - Зміни
GOALнаmanager_id=7 (Max)і порівняй фінальну синтезу. - Додай cost/token guardrails у
Budgetі у фінальний JSON результат.
Повний код на GitHub
У репозиторії лежить повна runnable-версія цього прикладу: planning, policy boundary, послідовне виконання кроків, stop reasons.
Переглянути повний код на GitHub ↗