Esencia del patrón (breve)
Multi-Agent Collaboration es un patrón en el que varios agentes con distintos roles trabajan sobre una misma tarea en rondas y llegan a una decisión alineada.
En cada rol, el LLM genera su contribución, y la collaboration policy controla el contrato de contribución, los límites de rondas y el criterio de preparación.
Qué demuestra este ejemplo
- un equipo con 3 roles:
demand_analyst,finance_analyst,risk_analyst - contexto compartido (
shared_context) para todos los agentes - un collaboration gateway que valida las contribuciones de agentes contra el contrato
- rondas de alineación: conflict detection + verificación de consenso
- allowlists separadas de roles para policy vs execution
- la decisión final (
final_decision) puede sergo,go_with_cautionono_go(determinada por policy) - presupuestos de runtime:
max_rounds,max_messages,max_seconds stop_reason,traceehistoryexplícitos para monitoreo en producción
Arquitectura
- El coordinador forma un tablero de estado compartido (
goal + shared_context). - Cada modelo LLM por rol devuelve una contribución estructurada (
agent/stance/summary/confidence/actions). - Gateway valida la contribución y bloquea roles inválidos o no permitidos.
- Después de cada ronda, el sistema detecta conflictos y verifica si se alcanzó consenso.
- Si no hay consenso, los resultados de la ronda se vuelven contexto para la siguiente ronda.
- Tras la alineación, el LLM sintetiza un operations brief final corto.
Contrato clave: el LLM propone una contribución, pero la aceptación de la contribución y las reglas de cierre las controla la policy layer.
Estructura del proyecto
examples/
└── agent-patterns/
└── multi-agent-collaboration/
└── python/
├── main.py # Team loop -> Validate -> Resolve -> Finalize
├── llm.py # role contributions + final synthesis
├── gateway.py # contribution contract + conflict/consensus policy
├── signals.py # deterministic shared context
└── requirements.txt
Cómo ejecutar
git clone https://github.com/AgentPatterns-tech/agentpatterns.git
cd agentpatterns
cd examples/agent-patterns/multi-agent-collaboration/python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Se requiere Python 3.11+.
Opción con export:
export OPENAI_API_KEY="sk-..."
# optional:
# export OPENAI_MODEL="gpt-4.1-mini"
# export OPENAI_TIMEOUT_SECONDS="60"
python main.py
Opción con .env (opcional)
cat > .env <<'EOF'
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4.1-mini
OPENAI_TIMEOUT_SECONDS=60
EOF
set -a
source .env
set +a
python main.py
Esta es la variante de shell (macOS/Linux). En Windows es más fácil usar variables con set o, si quieres, python-dotenv para cargar .env automáticamente.
Tarea
Imagina un caso de producción antes del lanzamiento de una campaña:
"Prepara un brief go/no-go para Checkout v2 en el mercado de US para 2026-03-02. Se necesita una decisión alineada entre growth, finance y risk."
Un solo agente aquí puede omitir fácilmente parte de los riesgos o de la economía. Por eso se ejecuta un ciclo de equipo:
- cada rol da una conclusión separada
- el sistema verifica discrepancias
- la siguiente ronda se inicia solo si hace falta
- la respuesta final aparece tras la alineación
Solución
En este ejemplo:
- los agentes por rol trabajan sobre un único contexto compartido
- gateway mantiene un contrato JSON estricto y límites de ejecución
detect_conflicts(...)ydecide_round_outcome(...)determinan si continuar- si el equipo no se alinea dentro de
max_rounds, el run se detiene de forma controlada - si se alinea, un paso de finalize separado compone un brief final corto
Código
signals.py — hechos compartidos para el equipo
from __future__ import annotations
from typing import Any
def build_shared_context(*, report_date: str, region: str) -> dict[str, Any]:
return {
"report_date": report_date,
"region": region,
"campaign": {
"name": "Checkout v2 Launch",
"window": "2026-03-02",
"channel": "US paid + lifecycle",
},
"demand_signals": {
"projected_orders": 15200,
"conversion_lift_pct": 12.4,
"traffic_risk": "medium",
},
"finance_signals": {
"projected_revenue_usd": 684000.0,
"expected_margin_pct": 19.2,
"promo_cost_usd": 94000.0,
},
"risk_signals": {
"failed_payment_rate": 0.028,
"chargeback_alerts": 4,
"critical_incidents": 0,
},
"policy_limits": {
"payment_failure_block_threshold": 0.03,
"max_chargeback_alerts_for_go": 5,
},
}
Lo más importante aquí (en simple)
- Todos los roles ven el mismo
shared_context. - Los hechos son deterministas: están separados de las decisiones del LLM.
gateway.py — policy boundary para colaboración
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
class StopRun(Exception):
def __init__(self, reason: str):
super().__init__(reason)
self.reason = reason
@dataclass(frozen=True)
class Budget:
max_rounds: int = 3
max_messages: int = 12
max_seconds: int = 40
min_go_votes: int = 2
ALLOWED_STANCES = {"go", "caution", "block"}
def _is_number(value: Any) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
if not isinstance(raw, dict):
raise StopRun("invalid_contribution:not_object")
required = {"agent", "stance", "summary", "confidence", "actions"}
if not required.issubset(raw.keys()):
raise StopRun("invalid_contribution:missing_keys")
agent = raw["agent"]
stance = raw["stance"]
summary = raw["summary"]
confidence = raw["confidence"]
actions = raw["actions"]
if not isinstance(agent, str) or not agent.strip():
raise StopRun("invalid_contribution:agent")
agent = agent.strip()
if agent not in allowed_agents:
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
raise StopRun("invalid_contribution:stance")
stance = stance.strip()
if not isinstance(summary, str) or not summary.strip():
raise StopRun("invalid_contribution:summary")
if not _is_number(confidence):
raise StopRun("invalid_contribution:confidence_type")
confidence = float(confidence)
if not (0.0 <= confidence <= 1.0):
raise StopRun("invalid_contribution:confidence_range")
if not isinstance(actions, list) or not actions:
raise StopRun("invalid_contribution:actions")
normalized_actions: list[str] = []
for item in actions:
if not isinstance(item, str) or not item.strip():
raise StopRun("invalid_contribution:action_item")
normalized_actions.append(item.strip())
# Ignore unknown keys to tolerate extra LLM fields.
return {
"agent": agent,
"stance": stance,
"summary": summary.strip(),
"confidence": round(confidence, 3),
"actions": normalized_actions[:3],
}
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
if not contributions:
return ["no_contributions"]
stances = {item["stance"] for item in contributions}
conflicts: list[str] = []
if "go" in stances and "caution" in stances and "block" not in stances:
conflicts.append("go_vs_caution")
if "block" in stances and len(stances) > 1:
conflicts.append("blocking_vs_non_block")
if len(stances) == 3:
conflicts.append("high_divergence")
return conflicts
def decide_round_outcome(
contributions: list[dict[str, Any]],
*,
min_go_votes: int,
) -> str | None:
go_votes = sum(1 for item in contributions if item["stance"] == "go")
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
block_votes = sum(1 for item in contributions if item["stance"] == "block")
if block_votes >= 2:
return "no_go"
if block_votes > 0:
return None
if go_votes >= min_go_votes and caution_votes == 0:
return "go"
if go_votes >= min_go_votes and caution_votes > 0:
return "go_with_caution"
return None
class CollaborationGateway:
def __init__(self, *, allow: set[str], budget: Budget):
self.allow = set(allow)
self.budget = budget
self.message_count = 0
def _consume_message_budget(self) -> None:
self.message_count += 1
if self.message_count > self.budget.max_messages:
raise StopRun("max_messages")
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
if expected_agent not in self.allow:
raise StopRun(f"agent_denied:{expected_agent}")
self._consume_message_budget()
contribution = validate_contribution(raw, allowed_agents=self.allow)
if contribution["agent"] != expected_agent:
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
return contribution
Lo más importante aquí (en simple)
- Gateway decide qué se considera exactamente una contribución válida del equipo.
- Pedimos a los roles que no añadan campos extra, pero gateway los tolera como protección frente a la "verbosidad" del LLM (contrato enforced por required keys).
detect_conflicts(...)ydecide_round_outcome(...)separan los conceptos de conflicto y preparación para finalizar.- Gateway solo enforce la execution allowlist que llega desde
main.py(la separación policy/execution vive enmain.py, no en gateway).
llm.py — contribuciones por rol y síntesis final
from __future__ import annotations
import json
import os
from typing import Any
from openai import APIConnectionError, APITimeoutError, OpenAI
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
class LLMTimeout(Exception):
pass
class LLMEmpty(Exception):
pass
COMMON_RULES = """
Return exactly one JSON object with this shape:
{
"agent": "<role_name>",
"stance": "go|caution|block",
"summary": "one short paragraph",
"confidence": 0.0,
"actions": ["action 1", "action 2"]
}
Rules:
- Use only the provided facts.
- Keep actions concrete and operational.
- Do not output markdown or extra keys.
""".strip()
AGENT_PROMPTS = {
"demand_analyst": (
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
"Decide whether launch is feasible from growth and operational demand perspective."
),
"finance_analyst": (
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
"Decide if launch economics are acceptable."
),
"risk_analyst": (
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
"Prioritize safety and compliance risk containment."
),
"legal_analyst": (
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
"Flag launch blockers and required mitigations."
),
}
FINAL_SYSTEM_PROMPT = """
You are a launch readiness editor.
Write a short operations brief in English.
Include:
- final decision (go/go_with_caution/no_go)
- why the team agreed
- top 2 immediate actions
Use only evidence from collaboration history.
""".strip()
def _get_client() -> OpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EnvironmentError(
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
)
return OpenAI(api_key=api_key)
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
summaries: list[dict[str, Any]] = []
for row in history[-limit:]:
summaries.append(
{
"round": row.get("round"),
"decision": row.get("decision"),
"conflicts": row.get("conflicts", []),
"stances": [
{
"agent": item.get("agent"),
"stance": item.get("stance"),
"confidence": item.get("confidence"),
}
for item in row.get("contributions", [])
],
}
)
return summaries
def propose_contribution(
*,
role: str,
goal: str,
shared_context: dict[str, Any],
history: list[dict[str, Any]],
open_conflicts: list[str],
) -> dict[str, Any]:
system = AGENT_PROMPTS.get(role)
if not system:
raise ValueError(f"unknown_role:{role}")
payload = {
"goal": goal,
"role": role,
"shared_context": shared_context,
"recent_rounds": _round_summaries(history, limit=2),
"open_conflicts": open_conflicts,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = completion.choices[0].message.content or "{}"
try:
return json.loads(text)
except json.JSONDecodeError:
return {"invalid": True, "raw": text}
def compose_final_answer(
*,
goal: str,
final_decision: str,
history: list[dict[str, Any]],
) -> str:
payload = {
"goal": goal,
"final_decision": final_decision,
"history": history,
}
client = _get_client()
try:
completion = client.chat.completions.create(
model=MODEL,
temperature=0,
timeout=LLM_TIMEOUT_SECONDS,
messages=[
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
],
)
except (APITimeoutError, APIConnectionError) as exc:
raise LLMTimeout("llm_timeout") from exc
text = (completion.choices[0].message.content or "").strip()
if not text:
raise LLMEmpty("llm_empty")
return text
Lo más importante aquí (en simple)
- Cada rol tiene su propio foco de sistema, pero el mismo contrato estructurado de respuesta.
- El historial de rondas anteriores se convierte en contexto para la siguiente contribución.
- El brief final se genera en un paso separado después de la alineación, no "sobre la marcha".
main.py — Team Loop -> Resolve -> Finalize
from __future__ import annotations
import json
import time
from typing import Any
from gateway import (
Budget,
CollaborationGateway,
StopRun,
decide_round_outcome,
detect_conflicts,
)
from llm import LLMEmpty, LLMTimeout, compose_final_answer, propose_contribution
from signals import build_shared_context
REPORT_DATE = "2026-03-02"
REGION = "US"
GOAL = (
"Prepare a go/no-go launch brief for Checkout v2 campaign in US on 2026-03-02. "
"Use collaboration across demand, finance, and risk analysts; return one aligned decision."
)
BUDGET = Budget(max_rounds=3, max_messages=12, max_seconds=40, min_go_votes=2)
TEAM_ROLES_POLICY = {
"demand_analyst",
"finance_analyst",
"risk_analyst",
"legal_analyst",
}
LEGAL_ANALYST_ENABLED = False
TEAM_ROLES_EXECUTION = (
TEAM_ROLES_POLICY
if LEGAL_ANALYST_ENABLED
else {"demand_analyst", "finance_analyst", "risk_analyst"}
)
TEAM_SEQUENCE = ["demand_analyst", "finance_analyst", "risk_analyst"]
# Set LEGAL_ANALYST_ENABLED=True and append "legal_analyst" to TEAM_SEQUENCE to test runtime denial paths.
def _latest_stances(contributions: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"agent": item["agent"],
"stance": item["stance"],
"confidence": item["confidence"],
}
for item in contributions
]
def run_collaboration(goal: str) -> dict[str, Any]:
started = time.monotonic()
shared_context = build_shared_context(report_date=REPORT_DATE, region=REGION)
history: list[dict[str, Any]] = []
trace: list[dict[str, Any]] = []
open_conflicts: list[str] = []
final_decision: str | None = None
gateway = CollaborationGateway(allow=TEAM_ROLES_EXECUTION, budget=BUDGET)
for round_no in range(1, BUDGET.max_rounds + 1):
if (time.monotonic() - started) > BUDGET.max_seconds:
return {
"status": "stopped",
"stop_reason": "max_seconds",
"trace": trace,
"history": history,
}
round_contributions: list[dict[str, Any]] = []
for role in TEAM_SEQUENCE:
try:
raw = propose_contribution(
role=role,
goal=goal,
shared_context=shared_context,
history=history,
open_conflicts=open_conflicts,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": f"round_{round_no}:{role}",
"trace": trace,
"history": history,
}
try:
contribution = gateway.accept(raw, expected_agent=role)
except StopRun as exc:
return {
"status": "stopped",
"stop_reason": exc.reason,
"phase": f"round_{round_no}:{role}",
"raw_contribution": raw,
"trace": trace,
"history": history,
}
round_contributions.append(contribution)
trace.append(
{
"round": round_no,
"agent": role,
"stance": contribution["stance"],
"confidence": contribution["confidence"],
"accepted": True,
}
)
conflicts = detect_conflicts(round_contributions)
round_decision = decide_round_outcome(
round_contributions,
min_go_votes=BUDGET.min_go_votes,
)
history_entry = {
"round": round_no,
"contributions": round_contributions,
"conflicts": conflicts,
"decision": round_decision,
}
history.append(history_entry)
trace.append(
{
"round": round_no,
"conflicts": conflicts,
"decision": round_decision or "next_round",
}
)
if round_decision:
final_decision = round_decision
break
open_conflicts = conflicts
if not final_decision:
return {
"status": "stopped",
"stop_reason": "max_rounds_reached",
"trace": trace,
"history": history,
}
try:
answer = compose_final_answer(
goal=goal,
final_decision=final_decision,
history=history,
)
except LLMTimeout:
return {
"status": "stopped",
"stop_reason": "llm_timeout",
"phase": "finalize",
"trace": trace,
"history": history,
}
except LLMEmpty:
return {
"status": "stopped",
"stop_reason": "llm_empty",
"phase": "finalize",
"trace": trace,
"history": history,
}
last_round = history[-1]
return {
"status": "ok",
"stop_reason": "success",
"answer": answer,
"final_decision": final_decision,
"rounds_used": len(history),
"team_summary": {
"report_date": REPORT_DATE,
"region": REGION,
"stances": _latest_stances(last_round["contributions"]),
"conflicts": last_round["conflicts"],
},
"trace": trace,
"history": history,
}
def main() -> None:
result = run_collaboration(GOAL)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Lo más importante aquí (en simple)
historyes memoria compartida entre rondas.- El criterio de cierre (
go/go_with_caution/no_go) está centralizado en una función de policy. - La diferencia entre roles de policy/execution se define en
main.pyvíaTEAM_ROLES_POLICYvsTEAM_ROLES_EXECUTION, y gateway solo enforce la execution allowlist.
requirements.txt
openai==2.21.0
Ejemplo de salida
Abajo hay un ejemplo de ejecución válida donde el equipo se alinea en la ronda 1 y devuelve final_decision="go".
{
"status": "ok",
"stop_reason": "success",
"answer": "Operations Brief: Checkout v2 Campaign Launch — US, 2026-03-02. Final Decision: GO. Demand, finance, and risk agreed the launch is feasible and within policy thresholds.",
"final_decision": "go",
"rounds_used": 1,
"team_summary": {
"report_date": "2026-03-02",
"region": "US",
"stances": [
{"agent": "demand_analyst", "stance": "go", "confidence": 0.9},
{"agent": "finance_analyst", "stance": "go", "confidence": 0.9},
{"agent": "risk_analyst", "stance": "go", "confidence": 0.9}
],
"conflicts": []
},
"trace": [
{"round": 1, "agent": "demand_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "finance_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "agent": "risk_analyst", "stance": "go", "confidence": 0.9, "accepted": true},
{"round": 1, "conflicts": [], "decision": "go"}
],
"history": [
{
"round": 1,
"contributions": [
{"agent": "demand_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["...", "..."]},
{"agent": "finance_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]},
{"agent": "risk_analyst", "stance": "go", "summary": "...", "confidence": 0.9, "actions": ["..."]}
],
"conflicts": [],
"decision": "go"
}
]
}
Este es un ejemplo abreviado: el texto largo de answer y summary/actions en history se recorta intencionalmente para legibilidad.
Valores típicos de stop_reason
success— el run se completó correctamente;final_decisionpuede sergo,go_with_cautionono_goinvalid_contribution:*— la contribución de un agente no pasó la validación de contratoinvalid_contribution:actions— role devolvióactionsvacío o inválido (en este ejemplo se requieren 1-3 acciones)agent_denied:<role>— el rol no está permitido por la execution allowlistllm_timeout— el LLM no respondió dentro deOPENAI_TIMEOUT_SECONDSllm_empty— el paso de finalize devolvió texto vacíomax_messages— se excedió el presupuesto de mensajes entre rolesmax_rounds_reached— el equipo no llegó a una decisión (final_decision) enmax_roundsmax_seconds— se excedió el presupuesto total de tiempo del run
Qué NO se muestra aquí
- No hay APIs de dominio reales ni datos en vivo.
- No hay un conflict resolver externo (por ejemplo, un human reviewer).
- No hay auth/ACL multi-tenant para roles y fuentes de datos.
- No hay adaptive team sizing (agregar/quitar roles dinámicamente).
Qué probar después
- Agrega
legal_analystaTEAM_SEQUENCEconLEGAL_ANALYST_ENABLED=Falsey observaagent_denied:legal_analystenstop_reason. - Configura
min_go_votes=3para exigir consenso completo antes del final. - Añade una regla de escalación a humano si
blockse repite durante 2 rondas seguidas.