Суть патерна (коротко)
Multi-Agent Collaboration — це патерн, у якому кілька агентів із різними ролями працюють над однією задачею в раундах і приходять до узгодженого рішення.
LLM у кожній ролі формує свій внесок, а collaboration-policy контролює контракт внеску, ліміти раундів і критерій готовності.
Що демонструє цей приклад
- команду з 3 ролей:
demand_analyst,finance_analyst,risk_analyst - спільний контекст (
shared_context) для всіх агентів - collaboration gateway, який валідовує внески агентів за контрактом
- раунди узгодження: conflict detection + перевірка консенсусу
- окремі policy vs execution allowlist-и ролей
- фінальне рішення (
final_decision) може бутиgo,go_with_cautionабоno_go(визначається policy) - runtime бюджети:
max_rounds,max_messages,max_seconds - явні
stop_reason,trace,historyдля продакшен-моніторингу
Архітектура
- Координатор формує спільну дошку стану (
goal + shared_context). - Кожна рольова LLM-модель повертає структурований внесок (
agent/stance/summary/confidence/actions). - Gateway валідовує внесок і блокує некоректні або недозволені ролі.
- Після кожного раунду система знаходить конфлікти та перевіряє, чи досягнуто консенсус.
- Якщо консенсусу немає, результати раунду стають контекстом для наступного раунду.
- Після узгодження LLM синтезує фінальний короткий operations brief.
Ключовий контракт: LLM пропонує внесок, але прийняття внеску і правила завершення контролює policy-layer.
Структура проєкту
examples/
└── agent-patterns/
└── multi-agent-collaboration/
└── python/
├── main.py # Team loop -> Validate -> Resolve -> Finalize
├── llm.py # role contributions + final synthesis
├── gateway.py # contribution contract + conflict/consensus policy
├── signals.py # deterministic shared context
└── requirements.txt
Як запустити
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Потрібен Python 3.11+.
Варіант через export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Варіант через .env (опційно)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.
Задача
Уяви продакшен-кейс перед запуском кампанії:
"Підготуй go/no-go бриф для Checkout v2 на US ринку на 2026-03-02. Потрібне узгоджене рішення між growth, finance і risk."
Один агент тут легко пропустить частину ризиків або економіки. Тому запускається командний цикл:
- кожна роль дає окремий висновок
- система перевіряє розбіжності
- наступний раунд запускається тільки якщо є потреба
- фінальна відповідь з'являється після узгодження
Рішення
У цьому прикладі:
- рольові агенти працюють через один спільний контекст
- gateway тримає жорсткий JSON-контракт і ліміти виконання
detect_conflicts(...)іdecide_round_outcome(...)визначають, чи рухатись далі- якщо команда не узгодилась за
max_rounds, run зупиняється контрольовано - якщо узгодилась, окремий finalize-крок формує короткий фінальний бриф
Код
signals.py — спільні факти для команди
from __future__ import annotations
from typing import Any
def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"campaign": {
"name": "Checkout v2 Launch",
"window": "2026-03-02",
"channel": "US paid + lifecycle",
},
"demand_signals": {
"projected_orders": 15200,
"conversion_lift_pct": 12.4,
"traffic_risk": "medium",
},
"finance_signals": {
"projected_revenue_usd": 684000.0,
"expected_margin_pct": 19.2,
"promo_cost_usd": 94000.0,
},
"risk_signals": {
"failed_payment_rate": 0.028,
"chargeback_alerts": 4,
"critical_incidents": 0,
},
"policy_limits": {
"payment_failure_block_threshold": 0.03,
"max_chargeback_alerts_for_go": 5,
},
}
Що тут найважливіше (простими словами)
- Всі ролі бачать один і той самий
shared_context. - Факти детерміновані: відділені від LLM-рішень.
gateway.py — policy boundary для колаборації
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_rounds: int = 3
max_messages: int = 12
max_seconds: int = 40
min_go_votes: int = 2
ALLOWED_STANCES = {"go", "caution", "block"}
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_contribution:not_object")
required = {"agent", "stance", "summary", "confidence", "actions"}
if not required.issubset(raw.keys()):
raise StopRun("invalid_contribution:missing_keys")
agent = raw["agent"]
stance = raw["stance"]
summary = raw["summary"]
confidence = raw["confidence"]
actions = raw["actions"]
if not isinstance(agent, str) or not agent.strip():
raise StopRun("invalid_contribution:agent")
agent = agent.strip()
if agent not in allowed_agents:
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
raise StopRun("invalid_contribution:stance")
stance = stance.strip()
if not isinstance(summary, str) or not summary.strip():
raise StopRun("invalid_contribution:summary")
if not _is_number(confidence):
raise StopRun("invalid_contribution:confidence_type")
confidence = float(confidence)
if not (0.0 <= confidence <= 1.0):
raise StopRun("invalid_contribution:confidence_range")
if not isinstance(actions, list) or not actions:
raise StopRun("invalid_contribution:actions")
normalized_actions: list[str] = []
for item in actions:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_contribution:action_item")
normalized_actions.append(item.strip())
# Ignore unknown keys to tolerate extra LLM fields.
return {
"agent": agent,
"stance": stance,
"summary": summary.strip(),
"confidence": round(confidence, 3),
"actions": normalized_actions[:3],
}
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
if not contributions:
return ["no_contributions"]
stances = {item["stance"] for item in contributions}
conflicts: list[str] = []
if "go" in stances and "caution" in stances and "block" not in stances:
conflicts.append("go_vs_caution")
if "block" in stances and len(stances) > 1:
conflicts.append("blocking_vs_non_block")
if len(stances) == 3:
conflicts.append("high_divergence")
return conflicts
def decide_round_outcome(
contributions: list[dict[str, Any]],
*,
min_go_votes: int,
) -> str | None:
go_votes = sum(1 for item in contributions if item["stance"] == "go")
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
block_votes = sum(1 for item in contributions if item["stance"] == "block")
if block_votes >= 2:
return "no_go"
if block_votes > 0:
return None
if go_votes >= min_go_votes and caution_votes == 0:
return "go"
if go_votes >= min_go_votes and caution_votes > 0:
return "go_with_caution"
return None
class CollaborationGateway:
def __init__(self, *, allow: set[str], budget: Budget):
self.allow = set(allow)
self.budget = budget
self.message_count = 0
def _consume_message_budget(self) -> None:
self.message_count += 1
if self.message_count > self.budget.max_messages:
raise StopRun("max_messages")
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
if expected_agent not in self.allow:
raise StopRun(f"agent_denied:{expected_agent}")
self._consume_message_budget()
contribution = validate_contribution(raw, allowed_agents=self.allow)
if contribution["agent"] != expected_agent:
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
return contribution
Що тут найважливіше (простими словами)
- Gateway приймає рішення, що саме вважається валідним внеском команди.
- Ми просимо ролі не додавати зайві поля, але gateway толерує їх як захист від LLM "балакучості" (контракт enforced по required keys).
detect_conflicts(...)таdecide_round_outcome(...)розділяють поняття конфлікту й готовності до фіналу.- Gateway тільки enforce-ить execution allowlist, який передається з
main.py(policy/execution розділення живе вmain.py, а не в gateway).
llm.py — рольові внески і фінальна синтеза
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
COMMON_RULES = """
Return exactly one JSON object with this shape:
{
"agent": "<role_name>",
"stance": "go|caution|block",
"summary": "one short paragraph",
"confidence": 0.0,
"actions": ["action 1", "action 2"]
}
Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()
AGENT_PROMPTS = {
"demand_analyst": (
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
"Decide whether launch is feasible from growth and operational demand perspective."
),
"finance_analyst": (
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
"Decide if launch economics are acceptable."
),
"risk_analyst": (
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
"Prioritize safety and compliance risk containment."
),
"legal_analyst": (
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
"Flag launch blockers and required mitigations."
),
}
FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for row in history[-limit:]:
summaries.append(
{
"round": row.get("round"),
"decision": row.get("decision"),
"conflicts": row.get("conflicts", []),
"stances": [
{
"agent": item.get("agent"),
"stance": item.get("stance"),
"confidence": item.get("confidence"),
}
for item in row.get("contributions", [])
],
}
)
return summaries
def propose_contribution(
*,
role: str,
goal: str,
shared_context: dict[str, Any],
history: list[dict[str, Any]],
open_conflicts: list[str],
) -> dict[str, Any]:
system = AGENT_PROMPTS.get(role)
if not system:
raise ValueError(f"unknown_role:{role}")
payload = {
"goal": goal,
"role": role,
"shared_context": shared_context,
"recent_rounds": _round_summaries(history, limit=2),
"open_conflicts": open_conflicts,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def compose_final_answer(
*,
goal: str,
final_decision: str,
history: list[dict[str, Any]],
) -> str:
payload = {
"goal": goal,
"final_decision": final_decision,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Що тут найважливіше (простими словами)
- Кожна роль має власний системний фокус, але однаковий структурний контракт відповіді.
- Історія попередніх раундів стає контекстом для наступного внеску.
- Фінальний бриф генерується окремим кроком після узгодження, а не "по ходу".
main.py — Team Loop -> Resolve -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import (
Budget,
CollaborationGateway,
StopRun,
decide_round_outcome,
detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context
REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
"Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
"Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)
BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)
TEAM_ROLES_POLICY = {
"demand_analyst",
"finance_analyst",
"risk_analyst",
"legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
TEAM_ROLES_POLICY
if LEGAL_ANALYST_ENABLED
else {"demand_analyst", "finance_analyst", "risk_analyst"}
)
TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.
def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"agent": item["agent"],
"stance": item["stance"],
"confidence": item["confidence"],
}
for item in contributions
]
def run_collaboration(goal: str) -> dict[str, Any]:
started = time.monotonic()
shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
open_conflicts: list[str] = []
final_decision: str | None = None
gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)
for round_no in range(1, BUDGET.max_rounds + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
round_contributions: list[dict[str, Any]] = []
for role in TEAM_SEQUENCE:
try:
raw = propose_contribution(
role=role,
goal=goal,
shared_context=shared_context,
history=history,
open_conflicts=open_conflicts,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": f"round_{round_no}:{role}",
"trace": trace,
"history": history,
}
try:
contribution = gateway.accept(raw, expected_agent=role)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": f"round_{round_no}:{role}",
"raw_contribution": raw,
"trace": trace,
"history": history,
}
round_contributions.append(contribution)
trace.append(
{
"round": round_no,
"agent": role,
"stance": contribution["stance"],
"confidence": contribution["confidence"],
"accepted": True,
}
)
conflicts = detect_conflicts(round_contributions)
round_decision = decide_round_outcome(
round_contributions,
min_go_votes=BUDGET.min_go_votes,
)
history_entry = {
"round": round_no,
"contributions": round_contributions,
"conflicts": conflicts,
"decision": round_decision,
}
history.append(history_entry)
trace.append(
{
"round": round_no,
"conflicts": conflicts,
"decision": round_decision or "next_round",
}
)
if round_decision:
final_decision = round_decision
break
open_conflicts = conflicts
if not final_decision:
return {
"status": "stopped",
"stop_reason": "max_rounds_reached",
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
final_decision=final_decision,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
last_round = history[-1]
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"final_decision": final_decision,
"rounds_used": len(history),
"team_summary": {
"report_date": REPORT_DATE,
"region": REGION,
"stances": _latest_stances(last_round["contributions"]),
"conflicts": last_round["conflicts"],
},
"trace": trace,
"history": history,
}
def main() -> None:
result = run_collaboration(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Що тут найважливіше (простими словами)
historyє спільною пам'яттю між раундами.- Критерій завершення (
go/go_with_caution/no_go) централізовано в policy функції. - Різниця policy/execution ролей задається в
main.pyчерезTEAM_ROLES_POLICYvsTEAM_ROLES_EXECUTION, а gateway лише enforce-ить execution allowlist.
requirements.txt
openai==2.21.0
Приклад виводу
Нижче приклад валідного запуску, де команда узгоджується в 1-му раунді та повертає final_decision="go".
{
"status": "ok",
"stop_reason": "success",
"answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
"final_decision": "go",
"rounds_used": 1,
"team_summary": {
"report_date": "2026-03-02",
"region": "US",
"stances": [
{"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
{"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
{"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
],
"conflicts": []
},
"trace": [
{"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "conflicts": [], "decision": "go"}
],
"history": [
{
"round": 1,
"contributions": [
{"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
{"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
{"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
],
"conflicts": [],
"decision": "go"
}
]
}
Це скорочений приклад: довгий текст answer і summary/actions у history навмисно обрізано для читабельності.
Типові stop_reason
success— run завершено коректно;final_decisionможе бутиgo,go_with_cautionабоno_goinvalid_contribution:*— внесок агента не пройшов контракт валідаціїinvalid_contribution:actions— role повернула порожній або неваліднийactions(у цьому прикладі потрібно 1-3 дії)agent_denied:<role>— роль не дозволена execution allowlist-омllm_timeout— LLM не відповів у межахOPENAI_TIMEOUT_SECONDSllm_empty— finalize-крок повернув порожній текстmax_messages— перевищено бюджет повідомлень між ролямиmax_rounds_reached— команда не дійшла рішення (final_decision) заmax_roundsmax_seconds— перевищено загальний time budget run
Що тут НЕ показано
- Немає реальних доменних API або live-даних.
- Немає зовнішнього conflict resolver (наприклад, human reviewer).
- Немає multi-tenant auth/ACL для ролей і джерел даних.
- Немає adaptive team sizing (динамічного додавання/видалення ролей).
Що спробувати далі
- Додай
legal_analystуTEAM_SEQUENCEприLEGAL_ANALYST_ENABLED=Falseі подивисьagent_denied:legal_analystуstop_reason. - Постав
min_go_votes=3, щоб вимагати повний консенсус перед фіналом. - Додай правило ескалації до людини, якщо
blockповторюється 2 раунди поспіль.