Kern des Musters (Kurz)
Multi-Agent Collaboration ist ein Muster, bei dem mehrere Agenten mit unterschiedlichen Rollen in Runden an einer Aufgabe arbeiten und zu einer abgestimmten Entscheidung kommen.
In jeder Rolle erzeugt das LLM seinen Beitrag, und die Collaboration-Policy steuert den Beitragsvertrag, Rundenlimits und das Bereitschaftskriterium.
Was dieses Beispiel zeigt
- ein Team mit 3 Rollen:
demand_analyst,finance_analyst,risk_analyst - gemeinsamer Kontext (
shared_context) für alle Agenten - ein Collaboration-Gateway, das Agentenbeiträge gegen den Vertrag validiert
- Abstimmungsrunden: Conflict Detection + Konsensprüfung
- getrennte Policy-vs-Execution-Allowlists für Rollen
- die finale Entscheidung (
final_decision) kanngo,go_with_cautionoderno_gosein (durch Policy bestimmt) - Runtime-Budgets:
max_rounds,max_messages,max_seconds - explizite
stop_reason,trace,historyfür Production-Monitoring
Architektur
- Der Koordinator bildet ein gemeinsames State-Board (
goal + shared_context). - Jedes rollenbezogene LLM-Modell liefert einen strukturierten Beitrag (
agent/stance/summary/confidence/actions). - Gateway validiert den Beitrag und blockiert ungültige oder nicht erlaubte Rollen.
- Nach jeder Runde findet das System Konflikte und prüft, ob Konsens erreicht wurde.
- Wenn es keinen Konsens gibt, werden Rundenergebnisse zum Kontext für die nächste Runde.
- Nach der Abstimmung synthetisiert das LLM ein finales kurzes Operations-Briefing.
Schlüsselvertrag: Das LLM schlägt einen Beitrag vor, aber Beitragsannahme und Abschlussregeln werden von der Policy-Layer kontrolliert.
Projektstruktur
examples/
└── agent-patterns/
└── multi-agent-collaboration/
└── python/
├── main.py # Team loop -> Validate -> Resolve -> Finalize
├── llm.py # role contributions + final synthesis
├── gateway.py # contribution contract + conflict/consensus policy
├── signals.py # deterministic shared context
└── requirements.txt
Ausführen
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Python 3.11+ ist erforderlich.
Variante über export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Variante über .env (optional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Das ist die Shell-Variante (macOS/Linux). Unter Windows ist es einfacher, set-Variablen zu verwenden oder optional python-dotenv, um .env automatisch zu laden.
Aufgabe
Stell dir einen Production-Fall vor dem Kampagnenstart vor:
"Erstelle ein Go/No-Go-Briefing für Checkout v2 im US-Markt für den 2026-03-02. Es wird eine abgestimmte Entscheidung zwischen Growth, Finance und Risk benötigt."
Ein einzelner Agent kann hier leicht Teile der Risiken oder der Wirtschaftlichkeit übersehen. Deshalb wird ein Team-Loop gestartet:
- jede Rolle gibt eine eigene Einschätzung
- das System prüft Abweichungen
- die nächste Runde startet nur bei Bedarf
- die finale Antwort erscheint nach der Abstimmung
Lösung
In diesem Beispiel:
- Rollenagenten arbeiten über einen gemeinsamen Kontext
- Gateway hält einen strikten JSON-Vertrag und Ausführungslimits
detect_conflicts(...)unddecide_round_outcome(...)bestimmen, ob es weitergeht- wenn sich das Team innerhalb von
max_roundsnicht abstimmt, stoppt der Run kontrolliert - wenn es sich abstimmt, erzeugt ein separater Finalize-Schritt ein kurzes finales Briefing
Code
signals.py — gemeinsame Fakten für das Team
from __future__ import annotations
from typing import Any
def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"campaign": {
"name": "Checkout v2 Launch",
"window": "2026-03-02",
"channel": "US paid + lifecycle",
},
"demand_signals": {
"projected_orders": 15200,
"conversion_lift_pct": 12.4,
"traffic_risk": "medium",
},
"finance_signals": {
"projected_revenue_usd": 684000.0,
"expected_margin_pct": 19.2,
"promo_cost_usd": 94000.0,
},
"risk_signals": {
"failed_payment_rate": 0.028,
"chargeback_alerts": 4,
"critical_incidents": 0,
},
"policy_limits": {
"payment_failure_block_threshold": 0.03,
"max_chargeback_alerts_for_go": 5,
},
}
Was hier am wichtigsten ist (einfach erklärt)
- Alle Rollen sehen denselben
shared_context. - Fakten sind deterministisch: getrennt von LLM-Entscheidungen.
gateway.py — Policy Boundary für Kollaboration
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_rounds: int = 3
max_messages: int = 12
max_seconds: int = 40
min_go_votes: int = 2
ALLOWED_STANCES = {"go", "caution", "block"}
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_contribution:not_object")
required = {"agent", "stance", "summary", "confidence", "actions"}
if not required.issubset(raw.keys()):
raise StopRun("invalid_contribution:missing_keys")
agent = raw["agent"]
stance = raw["stance"]
summary = raw["summary"]
confidence = raw["confidence"]
actions = raw["actions"]
if not isinstance(agent, str) or not agent.strip():
raise StopRun("invalid_contribution:agent")
agent = agent.strip()
if agent not in allowed_agents:
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
raise StopRun("invalid_contribution:stance")
stance = stance.strip()
if not isinstance(summary, str) or not summary.strip():
raise StopRun("invalid_contribution:summary")
if not _is_number(confidence):
raise StopRun("invalid_contribution:confidence_type")
confidence = float(confidence)
if not (0.0 <= confidence <= 1.0):
raise StopRun("invalid_contribution:confidence_range")
if not isinstance(actions, list) or not actions:
raise StopRun("invalid_contribution:actions")
normalized_actions: list[str] = []
for item in actions:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_contribution:action_item")
normalized_actions.append(item.strip())
# Ignore unknown keys to tolerate extra LLM fields.
return {
"agent": agent,
"stance": stance,
"summary": summary.strip(),
"confidence": round(confidence, 3),
"actions": normalized_actions[:3],
}
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
if not contributions:
return ["no_contributions"]
stances = {item["stance"] for item in contributions}
conflicts: list[str] = []
if "go" in stances and "caution" in stances and "block" not in stances:
conflicts.append("go_vs_caution")
if "block" in stances and len(stances) > 1:
conflicts.append("blocking_vs_non_block")
if len(stances) == 3:
conflicts.append("high_divergence")
return conflicts
def decide_round_outcome(
contributions: list[dict[str, Any]],
*,
min_go_votes: int,
) -> str | None:
go_votes = sum(1 for item in contributions if item["stance"] == "go")
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
block_votes = sum(1 for item in contributions if item["stance"] == "block")
if block_votes >= 2:
return "no_go"
if block_votes > 0:
return None
if go_votes >= min_go_votes and caution_votes == 0:
return "go"
if go_votes >= min_go_votes and caution_votes > 0:
return "go_with_caution"
return None
class CollaborationGateway:
def __init__(self, *, allow: set[str], budget: Budget):
self.allow = set(allow)
self.budget = budget
self.message_count = 0
def _consume_message_budget(self) -> None:
self.message_count += 1
if self.message_count > self.budget.max_messages:
raise StopRun("max_messages")
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
if expected_agent not in self.allow:
raise StopRun(f"agent_denied:{expected_agent}")
self._consume_message_budget()
contribution = validate_contribution(raw, allowed_agents=self.allow)
if contribution["agent"] != expected_agent:
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
return contribution
Was hier am wichtigsten ist (einfach erklärt)
- Gateway entscheidet, was genau als valider Teambeitrag gilt.
- Wir bitten Rollen, keine Zusatzfelder hinzuzufügen, aber Gateway toleriert sie als Schutz vor LLM-"Geschwätzigkeit" (Vertrag über required keys enforced).
detect_conflicts(...)unddecide_round_outcome(...)trennen die Begriffe Konflikt und Finalisierungsreife.- Gateway enforced nur die Execution-Allowlist, die aus
main.pyübergeben wird (Policy/Execution-Trennung lebt inmain.py, nicht im Gateway).
llm.py — Rollenbeiträge und finale Synthese
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
COMMON_RULES = """
Return exactly one JSON object with this shape:
{
"agent": "<role_name>",
"stance": "go|caution|block",
"summary": "one short paragraph",
"confidence": 0.0,
"actions": ["action 1", "action 2"]
}
Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()
AGENT_PROMPTS = {
"demand_analyst": (
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
"Decide whether launch is feasible from growth and operational demand perspective."
),
"finance_analyst": (
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
"Decide if launch economics are acceptable."
),
"risk_analyst": (
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
"Prioritize safety and compliance risk containment."
),
"legal_analyst": (
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
"Flag launch blockers and required mitigations."
),
}
FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for row in history[-limit:]:
summaries.append(
{
"round": row.get("round"),
"decision": row.get("decision"),
"conflicts": row.get("conflicts", []),
"stances": [
{
"agent": item.get("agent"),
"stance": item.get("stance"),
"confidence": item.get("confidence"),
}
for item in row.get("contributions", [])
],
}
)
return summaries
def propose_contribution(
*,
role: str,
goal: str,
shared_context: dict[str, Any],
history: list[dict[str, Any]],
open_conflicts: list[str],
) -> dict[str, Any]:
system = AGENT_PROMPTS.get(role)
if not system:
raise ValueError(f"unknown_role:{role}")
payload = {
"goal": goal,
"role": role,
"shared_context": shared_context,
"recent_rounds": _round_summaries(history, limit=2),
"open_conflicts": open_conflicts,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def compose_final_answer(
*,
goal: str,
final_decision: str,
history: list[dict[str, Any]],
) -> str:
payload = {
"goal": goal,
"final_decision": final_decision,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Was hier am wichtigsten ist (einfach erklärt)
- Jede Rolle hat ihren eigenen Systemfokus, aber denselben strukturierten Antwortvertrag.
- Historie vorheriger Runden wird zum Kontext für den nächsten Beitrag.
- Das finale Briefing wird in einem separaten Schritt nach der Abstimmung erzeugt, nicht "nebenbei".
main.py — Team Loop -> Resolve -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import (
Budget,
CollaborationGateway,
StopRun,
decide_round_outcome,
detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context
REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
"Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
"Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)
BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)
TEAM_ROLES_POLICY = {
"demand_analyst",
"finance_analyst",
"risk_analyst",
"legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
TEAM_ROLES_POLICY
if LEGAL_ANALYST_ENABLED
else {"demand_analyst", "finance_analyst", "risk_analyst"}
)
TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.
def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"agent": item["agent"],
"stance": item["stance"],
"confidence": item["confidence"],
}
for item in contributions
]
def run_collaboration(goal: str) -> dict[str, Any]:
started = time.monotonic()
shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
open_conflicts: list[str] = []
final_decision: str | None = None
gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)
for round_no in range(1, BUDGET.max_rounds + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
round_contributions: list[dict[str, Any]] = []
for role in TEAM_SEQUENCE:
try:
raw = propose_contribution(
role=role,
goal=goal,
shared_context=shared_context,
history=history,
open_conflicts=open_conflicts,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": f"round_{round_no}:{role}",
"trace": trace,
"history": history,
}
try:
contribution = gateway.accept(raw, expected_agent=role)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": f"round_{round_no}:{role}",
"raw_contribution": raw,
"trace": trace,
"history": history,
}
round_contributions.append(contribution)
trace.append(
{
"round": round_no,
"agent": role,
"stance": contribution["stance"],
"confidence": contribution["confidence"],
"accepted": True,
}
)
conflicts = detect_conflicts(round_contributions)
round_decision = decide_round_outcome(
round_contributions,
min_go_votes=BUDGET.min_go_votes,
)
history_entry = {
"round": round_no,
"contributions": round_contributions,
"conflicts": conflicts,
"decision": round_decision,
}
history.append(history_entry)
trace.append(
{
"round": round_no,
"conflicts": conflicts,
"decision": round_decision or "next_round",
}
)
if round_decision:
final_decision = round_decision
break
open_conflicts = conflicts
if not final_decision:
return {
"status": "stopped",
"stop_reason": "max_rounds_reached",
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
final_decision=final_decision,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
last_round = history[-1]
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"final_decision": final_decision,
"rounds_used": len(history),
"team_summary": {
"report_date": REPORT_DATE,
"region": REGION,
"stances": _latest_stances(last_round["contributions"]),
"conflicts": last_round["conflicts"],
},
"trace": trace,
"history": history,
}
def main() -> None:
result = run_collaboration(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Was hier am wichtigsten ist (einfach erklärt)
historyist gemeinsamer Speicher über Runden hinweg.- Das Abschlusskriterium (
go/go_with_caution/no_go) ist in einer Policy-Funktion zentralisiert. - Der Unterschied zwischen Policy/Execution-Rollen wird in
main.pyüberTEAM_ROLES_POLICYvsTEAM_ROLES_EXECUTIONdefiniert, und Gateway enforced nur die Execution-Allowlist.
requirements.txt
openai==2.21.0
Beispielausgabe
Unten ist ein Beispiel für einen validen Run, in dem sich das Team in Runde 1 abstimmt und final_decision="go" zurückgibt.
{
"status": "ok",
"stop_reason": "success",
"answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
"final_decision": "go",
"rounds_used": 1,
"team_summary": {
"report_date": "2026-03-02",
"region": "US",
"stances": [
{"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
{"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
{"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
],
"conflicts": []
},
"trace": [
{"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "conflicts": [], "decision": "go"}
],
"history": [
{
"round": 1,
"contributions": [
{"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
{"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
{"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
],
"conflicts": [],
"decision": "go"
}
]
}
Dies ist ein gekürztes Beispiel: langer answer-Text und summary/actions in history sind absichtlich zur besseren Lesbarkeit gekürzt.
Typische stop_reason-Werte
success— Run wurde korrekt abgeschlossen;final_decisionkanngo,go_with_cautionoderno_goseininvalid_contribution:*— ein Agentenbeitrag hat die Vertragsvalidierung nicht bestandeninvalid_contribution:actions— Rolle hat leere oder ungültigeactionszurückgegeben (in diesem Beispiel sind 1-3 Aktionen erforderlich)agent_denied:<role>— Rolle ist durch die Execution-Allowlist nicht erlaubtllm_timeout— LLM hat innerhalb vonOPENAI_TIMEOUT_SECONDSnicht geantwortetllm_empty— Finalize-Schritt hat leeren Text zurückgegebenmax_messages— Nachrichtenbudget zwischen Rollen wurde überschrittenmax_rounds_reached— Team hat innerhalb vonmax_roundskeine Entscheidung (final_decision) erreichtmax_seconds— gesamtes Zeitbudget des Runs wurde überschritten
Was hier NICHT gezeigt wird
- Keine realen Domain-APIs oder Live-Daten.
- Kein externer Conflict Resolver (zum Beispiel ein Human Reviewer).
- Keine Multi-Tenant-Auth/ACL für Rollen und Datenquellen.
- Kein Adaptive Team Sizing (dynamisches Hinzufügen/Entfernen von Rollen).
Was du als Nächstes ausprobieren kannst
- Füge
legal_analystzuTEAM_SEQUENCEmitLEGAL_ANALYST_ENABLED=Falsehinzu und beobachteagent_denied:legal_analystinstop_reason. - Setze
min_go_votes=3, um vor dem Finale vollen Konsens zu verlangen. - Füge eine Eskalationsregel zu einem Menschen hinzu, wenn sich
block2 Runden in Folge wiederholt.