Multi-Agent Collaboration Agent — Python (повна реалізація з LLM)

Production-style runnable приклад Multi-Agent Collaboration агента на Python з ролями команди, спільною дошкою стану, раундами узгодження, conflict detection і stop reasons.
На цій сторінці
  1. Суть патерна (коротко)
  2. Що демонструє цей приклад
  3. Архітектура
  4. Структура проєкту
  5. Як запустити
  6. Задача
  7. Рішення
  8. Код
  9. signals.py — спільні факти для команди
  10. gateway.py — policy boundary для колаборації
  11. llm.py — рольові внески і фінальна синтеза
  12. main.py — Team Loop -> Resolve -> Finalize
  13. requirements.txt
  14. Приклад виводу
  15. Типові stop_reason
  16. Що тут НЕ показано
  17. Що спробувати далі

Суть патерна (коротко)

Multi-Agent Collaboration — це патерн, у якому кілька агентів із різними ролями працюють над однією задачею в раундах і приходять до узгодженого рішення.

LLM у кожній ролі формує свій внесок, а collaboration-policy контролює контракт внеску, ліміти раундів і критерій готовності.


Що демонструє цей приклад

  • команду з 3 ролей: demand_analyst, finance_analyst, risk_analyst
  • спільний контекст (shared_context) для всіх агентів
  • collaboration gateway, який валідовує внески агентів за контрактом
  • раунди узгодження: conflict detection + перевірка консенсусу
  • окремі policy vs execution allowlist-и ролей
  • фінальне рішення (final_decision) може бути go, go_with_caution або no_go (визначається policy)
  • runtime бюджети: max_rounds, max_messages, max_seconds
  • явні stop_reason, trace, history для продакшен-моніторингу

Архітектура

  1. Координатор формує спільну дошку стану (goal + shared_context).
  2. Кожна рольова LLM-модель повертає структурований внесок (agent/stance/summary/confidence/actions).
  3. Gateway валідовує внесок і блокує некоректні або недозволені ролі.
  4. Після кожного раунду система знаходить конфлікти та перевіряє, чи досягнуто консенсус.
  5. Якщо консенсусу немає, результати раунду стають контекстом для наступного раунду.
  6. Після узгодження LLM синтезує фінальний короткий operations brief.

Ключовий контракт: LLM пропонує внесок, але прийняття внеску і правила завершення контролює policy-layer.


Структура проєкту

TEXT
examples/
└── agent-patterns/
    └── multi-agent-collaboration/
        └── python/
            ├── main.py           # Team loop -> Validate -> Resolve -> Finalize
            ├── llm.py            # role contributions + final synthesis
            ├── gateway.py        # contribution contract + conflict/consensus policy
            ├── signals.py        # deterministic shared context
            └── requirements.txt

Як запустити

BASH
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns

cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Потрібен Python 3.11+.

Варіант через export:

BASH
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"

python main.py
Варіант через .env (опційно)
BASH
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF

set -a
source .env
set +a

python main.py

Це shell-варіант (macOS/Linux). На Windows простіше використовувати set змінних або, за бажанням, python-dotenv, щоб підвантажувати .env автоматично.


Задача

Уяви продакшен-кейс перед запуском кампанії:

"Підготуй go/no-go бриф для Checkout v2 на US ринку на 2026-03-02. Потрібне узгоджене рішення між growth, finance і risk."

Один агент тут легко пропустить частину ризиків або економіки. Тому запускається командний цикл:

  • кожна роль дає окремий висновок
  • система перевіряє розбіжності
  • наступний раунд запускається тільки якщо є потреба
  • фінальна відповідь з'являється після узгодження

Рішення

У цьому прикладі:

  • рольові агенти працюють через один спільний контекст
  • gateway тримає жорсткий JSON-контракт і ліміти виконання
  • detect_conflicts(...) і decide_round_outcome(...) визначають, чи рухатись далі
  • якщо команда не узгодилась за max_rounds, run зупиняється контрольовано
  • якщо узгодилась, окремий finalize-крок формує короткий фінальний бриф

Код

signals.py — спільні факти для команди

PYTHON
from __future__ import annotations

from typing import Any



def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
    return {
        "report_date": report_date,
        "region": region,
        "campaign": {
            "name": "Checkout v2 Launch",
            "window": "2026-03-02",
            "channel": "US paid + lifecycle",
        },
        "demand_signals": {
            "projected_orders": 15200,
            "conversion_lift_pct": 12.4,
            "traffic_risk": "medium",
        },
        "finance_signals": {
            "projected_revenue_usd": 684000.0,
            "expected_margin_pct": 19.2,
            "promo_cost_usd": 94000.0,
        },
        "risk_signals": {
            "failed_payment_rate": 0.028,
            "chargeback_alerts": 4,
            "critical_incidents": 0,
        },
        "policy_limits": {
            "payment_failure_block_threshold": 0.03,
            "max_chargeback_alerts_for_go": 5,
        },
    }

Що тут найважливіше (простими словами)

  • Всі ролі бачать один і той самий shared_context.
  • Факти детерміновані: відділені від LLM-рішень.

gateway.py — policy boundary для колаборації

PYTHON
from __future__ import annotations

from dataclasses import dataclass
from typing import Any


class StopRun(Exception):
    def __init__(self, reason: str):
        super().__init__(reason)
        self.reason = reason


@dataclass(frozen=True)
class Budget:
    max_rounds: int = 3
    max_messages: int = 12
    max_seconds: int = 40
    min_go_votes: int = 2


ALLOWED_STANCES = {"go", "caution", "block"}



def _is_number(value: Any) -> bool:
    return isinstance(value, (int, float)) and not isinstance(value, bool)



def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
    if not isinstance(raw, dict):
        raise StopRun("invalid_contribution:not_object")

    required = {"agent", "stance", "summary", "confidence", "actions"}
    if not required.issubset(raw.keys()):
        raise StopRun("invalid_contribution:missing_keys")

    agent = raw["agent"]
    stance = raw["stance"]
    summary = raw["summary"]
    confidence = raw["confidence"]
    actions = raw["actions"]

    if not isinstance(agent, str) or not agent.strip():
        raise StopRun("invalid_contribution:agent")
    agent = agent.strip()
    if agent not in allowed_agents:
        raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")

    if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
        raise StopRun("invalid_contribution:stance")
    stance = stance.strip()

    if not isinstance(summary, str) or not summary.strip():
        raise StopRun("invalid_contribution:summary")

    if not _is_number(confidence):
        raise StopRun("invalid_contribution:confidence_type")
    confidence = float(confidence)
    if not (0.0 <= confidence <= 1.0):
        raise StopRun("invalid_contribution:confidence_range")

    if not isinstance(actions, list) or not actions:
        raise StopRun("invalid_contribution:actions")

    normalized_actions: list[str] = []
    for item in actions:
        if not isinstance(item, str) or not item.strip():
            raise StopRun("invalid_contribution:action_item")
        normalized_actions.append(item.strip())

    # Ignore unknown keys to tolerate extra LLM fields.
    return {
        "agent": agent,
        "stance": stance,
        "summary": summary.strip(),
        "confidence": round(confidence, 3),
        "actions": normalized_actions[:3],
    }



def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
    if not contributions:
        return ["no_contributions"]

    stances = {item["stance"] for item in contributions}
    conflicts: list[str] = []

    if "go" in stances and "caution" in stances and "block" not in stances:
        conflicts.append("go_vs_caution")
    if "block" in stances and len(stances) > 1:
        conflicts.append("blocking_vs_non_block")
    if len(stances) == 3:
        conflicts.append("high_divergence")

    return conflicts



def decide_round_outcome(
    contributions: list[dict[str, Any]],
    *,
    min_go_votes: int,
) -> str | None:
    go_votes = sum(1 for item in contributions if item["stance"] == "go")
    caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
    block_votes = sum(1 for item in contributions if item["stance"] == "block")

    if block_votes >= 2:
        return "no_go"

    if block_votes > 0:
        return None
    if go_votes >= min_go_votes and caution_votes == 0:
        return "go"
    if go_votes >= min_go_votes and caution_votes > 0:
        return "go_with_caution"
    return None


class CollaborationGateway:
    def __init__(self, *, allow: set[str], budget: Budget):
        self.allow = set(allow)
        self.budget = budget
        self.message_count = 0

    def _consume_message_budget(self) -> None:
        self.message_count += 1
        if self.message_count > self.budget.max_messages:
            raise StopRun("max_messages")

    def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
        if expected_agent not in self.allow:
            raise StopRun(f"agent_denied:{expected_agent}")

        self._consume_message_budget()
        contribution = validate_contribution(raw, allowed_agents=self.allow)

        if contribution["agent"] != expected_agent:
            raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")

        return contribution

Що тут найважливіше (простими словами)

  • Gateway приймає рішення, що саме вважається валідним внеском команди.
  • Ми просимо ролі не додавати зайві поля, але gateway толерує їх як захист від LLM "балакучості" (контракт enforced по required keys).
  • detect_conflicts(...) та decide_round_outcome(...) розділяють поняття конфлікту й готовності до фіналу.
  • Gateway тільки enforce-ить execution allowlist, який передається з main.py (policy/execution розділення живе в main.py, а не в gateway).

llm.py — рольові внески і фінальна синтеза

PYTHON
from __future__ import annotations

import json
import os
from typing import Any

from openai import APIConnectionError, APITimeoutError, OpenAI

MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))


class LLMTimeout(Exception):
    pass


class LLMEmpty(Exception):
    pass


COMMON_RULES = """
Return exactly one JSON object with this shape:
{
  "agent": "<role_name>",
  "stance": "go|caution|block",
  "summary": "one short paragraph",
  "confidence": 0.0,
  "actions": ["action 1", "action 2"]
}

Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()

AGENT_PROMPTS = {
    "demand_analyst": (
        "You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
        "Decide whether launch is feasible from growth and operational demand perspective."
    ),
    "finance_analyst": (
        "You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
        "Decide if launch economics are acceptable."
    ),
    "risk_analyst": (
        "You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
        "Prioritize safety and compliance risk containment."
    ),
    "legal_analyst": (
        "You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
        "Flag launch blockers and required mitigations."
    ),
}

FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()



def _get_client() -> OpenAI:
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
        )
    return OpenAI(api_key=api_key)



def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
    summaries: list[dict[str, Any]] = []
    for row in history[-limit:]:
        summaries.append(
            {
                "round": row.get("round"),
                "decision": row.get("decision"),
                "conflicts": row.get("conflicts", []),
                "stances": [
                    {
                        "agent": item.get("agent"),
                        "stance": item.get("stance"),
                        "confidence": item.get("confidence"),
                    }
                    for item in row.get("contributions", [])
                ],
            }
        )
    return summaries



def propose_contribution(
    *,
    role: str,
    goal: str,
    shared_context: dict[str, Any],
    history: list[dict[str, Any]],
    open_conflicts: list[str],
) -> dict[str, Any]:
    system = AGENT_PROMPTS.get(role)
    if not system:
        raise ValueError(f"unknown_role:{role}")

    payload = {
        "goal": goal,
        "role": role,
        "shared_context": shared_context,
        "recent_rounds": _round_summaries(history, limit=2),
        "open_conflicts": open_conflicts,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = completion.choices[0].message.content or "{}"
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return {"invalid": True, "raw": text}



def compose_final_answer(
    *,
    goal: str,
    final_decision: str,
    history: list[dict[str, Any]],
) -> str:
    payload = {
        "goal": goal,
        "final_decision": final_decision,
        "history": history,
    }

    client = _get_client()
    try:
        completion = client.chat.completions.create(
            model=MODEL,
            temperature=0,
            timeout=LLM_TIMEOUT_SECONDS,
            messages=[
                {"role": "system", "content": FINAL_SYSTEM_PROMPT},
                {"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
            ],
        )
    except (APITimeoutError, APIConnectionError) as exc:
        raise LLMTimeout("llm_timeout") from exc

    text = (completion.choices[0].message.content or "").strip()
    if not text:
        raise LLMEmpty("llm_empty")
    return text

Що тут найважливіше (простими словами)

  • Кожна роль має власний системний фокус, але однаковий структурний контракт відповіді.
  • Історія попередніх раундів стає контекстом для наступного внеску.
  • Фінальний бриф генерується окремим кроком після узгодження, а не "по ходу".

main.py — Team Loop -> Resolve -> Finalize

PYTHON
from __future__ import annotations

import json
import time
from typing import Any

from gateway import (
    Budget,
    CollaborationGateway,
    StopRun,
    decide_round_outcome,
    detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context

REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
    "Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
    "Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)

BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)

TEAM_ROLES_POLICY = {
    "demand_analyst",
    "finance_analyst",
    "risk_analyst",
    "legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
    TEAM_ROLES_POLICY
    if LEGAL_ANALYST_ENABLED
    else {"demand_analyst", "finance_analyst", "risk_analyst"}
)

TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.



def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
    return [
        {
            "agent": item["agent"],
            "stance": item["stance"],
            "confidence": item["confidence"],
        }
        for item in contributions
    ]



def run_collaboration(goal: str) -> dict[str, Any]:
    started = time.monotonic()
    shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)

    history: list[dict[str, Any]] = []
    trace: list[dict[str, Any]] = []
    open_conflicts: list[str] = []
    final_decision: str | None = None

    gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)

    for round_no in range(1, BUDGET.max_rounds + 1):
        if (time.monotonic() - started) > BUDGET.max_seconds:
            return {
                "status": "stopped",
                "stop_reason": "max_seconds",
                "trace": trace,
                "history": history,
            }

        round_contributions: list[dict[str, Any]] = []

        for role in TEAM_SEQUENCE:
            try:
                raw = propose_contribution(
                    role=role,
                    goal=goal,
                    shared_context=shared_context,
                    history=history,
                    open_conflicts=open_conflicts,
                )
            except LLMTimeout:
                return {
                    "status": "stopped",
                    "stop_reason": "llm_timeout",
                    "phase": f"round_{round_no}:{role}",
                    "trace": trace,
                    "history": history,
                }

            try:
                contribution = gateway.accept(raw, expected_agent=role)
            except StopRun as exc:
                return {
                    "status": "stopped",
                    "stop_reason": exc.reason,
                    "phase": f"round_{round_no}:{role}",
                    "raw_contribution": raw,
                    "trace": trace,
                    "history": history,
                }

            round_contributions.append(contribution)
            trace.append(
                {
                    "round": round_no,
                    "agent": role,
                    "stance": contribution["stance"],
                    "confidence": contribution["confidence"],
                    "accepted": True,
                }
            )

        conflicts = detect_conflicts(round_contributions)
        round_decision = decide_round_outcome(
            round_contributions,
            min_go_votes=BUDGET.min_go_votes,
        )

        history_entry = {
            "round": round_no,
            "contributions": round_contributions,
            "conflicts": conflicts,
            "decision": round_decision,
        }
        history.append(history_entry)

        trace.append(
            {
                "round": round_no,
                "conflicts": conflicts,
                "decision": round_decision or "next_round",
            }
        )

        if round_decision:
            final_decision = round_decision
            break

        open_conflicts = conflicts

    if not final_decision:
        return {
            "status": "stopped",
            "stop_reason": "max_rounds_reached",
            "trace": trace,
            "history": history,
        }

    try:
        answer = compose_final_answer(
            goal=goal,
            final_decision=final_decision,
            history=history,
        )
    except LLMTimeout:
        return {
            "status": "stopped",
            "stop_reason": "llm_timeout",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }
    except LLMEmpty:
        return {
            "status": "stopped",
            "stop_reason": "llm_empty",
            "phase": "finalize",
            "trace": trace,
            "history": history,
        }

    last_round = history[-1]
    return {
        "status": "ok",
        "stop_reason": "success",
        "answer": answer,
        "final_decision": final_decision,
        "rounds_used": len(history),
        "team_summary": {
            "report_date": REPORT_DATE,
            "region": REGION,
            "stances": _latest_stances(last_round["contributions"]),
            "conflicts": last_round["conflicts"],
        },
        "trace": trace,
        "history": history,
    }



def main() -> None:
    result = run_collaboration(GOAL)
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Що тут найважливіше (простими словами)

  • history є спільною пам'яттю між раундами.
  • Критерій завершення (go/go_with_caution/no_go) централізовано в policy функції.
  • Різниця policy/execution ролей задається в main.py через TEAM_ROLES_POLICY vs TEAM_ROLES_EXECUTION, а gateway лише enforce-ить execution allowlist.

requirements.txt

TEXT
openai==2.21.0

Приклад виводу

Нижче приклад валідного запуску, де команда узгоджується в 1-му раунді та повертає final_decision="go".

JSON
{
  "status": "ok",
  "stop_reason": "success",
  "answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
  "final_decision": "go",
  "rounds_used": 1,
  "team_summary": {
    "report_date": "2026-03-02",
    "region": "US",
    "stances": [
      {"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
      {"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
      {"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
    ],
    "conflicts": []
  },
  "trace": [
    {"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
    {"round": 1, "conflicts": [], "decision": "go"}
  ],
  "history": [
    {
      "round": 1,
      "contributions": [
        {"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
        {"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
        {"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
      ],
      "conflicts": [],
      "decision": "go"
    }
  ]
}

Це скорочений приклад: довгий текст answer і summary/actions у history навмисно обрізано для читабельності.


Типові stop_reason

  • success — run завершено коректно; final_decision може бути go, go_with_caution або no_go
  • invalid_contribution:* — внесок агента не пройшов контракт валідації
  • invalid_contribution:actions — role повернула порожній або невалідний actions (у цьому прикладі потрібно 1-3 дії)
  • agent_denied:<role> — роль не дозволена execution allowlist-ом
  • llm_timeout — LLM не відповів у межах OPENAI_TIMEOUT_SECONDS
  • llm_empty — finalize-крок повернув порожній текст
  • max_messages — перевищено бюджет повідомлень між ролями
  • max_rounds_reached — команда не дійшла рішення (final_decision) за max_rounds
  • max_seconds — перевищено загальний time budget run

Що тут НЕ показано

  • Немає реальних доменних API або live-даних.
  • Немає зовнішнього conflict resolver (наприклад, human reviewer).
  • Немає multi-tenant auth/ACL для ролей і джерел даних.
  • Немає adaptive team sizing (динамічного додавання/видалення ролей).

Що спробувати далі

  1. Додай legal_analyst у TEAM_SEQUENCE при LEGAL_ANALYST_ENABLED=False і подивись agent_denied:legal_analyst у stop_reason.
  2. Постав min_go_votes=3, щоб вимагати повний консенсус перед фіналом.
  3. Додай правило ескалації до людини, якщо block повторюється 2 раунди поспіль.
⏱️ 12 хв читанняОновлено Бер, 2026Складність: ★★☆
Інтегровано: продакшен-контрольOnceOnly
Додай guardrails до агентів з tool-calling
Зашип цей патерн з governance:
  • Бюджетами (кроки / ліміти витрат)
  • Дозволами на інструменти (allowlist / blocklist)
  • Kill switch та аварійна зупинка
  • Ідемпотентність і dedupe
  • Audit logs та трасування
Інтегрована згадка: OnceOnly — контрольний шар для продакшен агент-систем.
Автор

Цю документацію курують і підтримують інженери, які запускають AI-агентів у продакшені.

Контент створено з допомогою AI, із людською редакторською відповідальністю за точність, ясність і продакшн-релевантність.

Патерни та рекомендації базуються на постмортемах, режимах відмов і операційних інцидентах у розгорнутих системах, зокрема під час розробки та експлуатації governance-інфраструктури для агентів у OnceOnly.