saap-plattform / backend /services /multi_agent_coordinator.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
27.8 kB
#!/usr/bin/env python3
"""
Multi-Agent Coordinator Service for SAAP Platform
Enables autonomous agent-to-agent communication and task delegation
"""
import asyncio
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
from dataclasses import dataclass
import redis.asyncio as aioredis
from pydantic import BaseModel
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TaskStatus(str, Enum):
CREATED = "created"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class TaskPriority(str, Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
@dataclass
class AgentCapability:
"""Agent capability definition for intelligent task matching"""
name: str
description: str
keywords: List[str]
complexity_level: int # 1-10 scale
class TaskRequest(BaseModel):
task_id: str
task_type: str
description: str
input_data: Dict[str, Any]
priority: TaskPriority = TaskPriority.NORMAL
status: TaskStatus = TaskStatus.CREATED
assigned_agent: Optional[str] = None
parent_task_id: Optional[str] = None
max_execution_time: int = 300 # seconds
class TaskResult(BaseModel):
task_id: str
agent_id: str
status: TaskStatus
result: Dict[str, Any]
execution_time: float
timestamp: datetime
error_message: Optional[str] = None
class MultiAgentCoordinator:
"""
Multi-Agent Coordinator for autonomous task delegation and workflow orchestration
Jane Alesi acts as the master coordinator with intelligent agent selection
"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = None
# Agent capabilities database
self.agent_capabilities = {
"jane_alesi": [
AgentCapability("coordination", "Master coordination and workflow management",
["coordinate", "manage", "orchestrate", "plan"], 9),
AgentCapability("architecture", "System architecture and design decisions",
["architecture", "design", "system", "structure"], 10),
AgentCapability("integration", "Multi-agent integration and communication",
["integrate", "communication", "multi-agent"], 10)
],
"john_alesi": [
AgentCapability("development", "Software development and coding",
["code", "develop", "program", "software", "implementation"], 9),
AgentCapability("debugging", "Code debugging and troubleshooting",
["debug", "fix", "error", "troubleshoot"], 8),
AgentCapability("optimization", "Performance optimization and refactoring",
["optimize", "performance", "refactor"], 7),
AgentCapability("testing", "Unit testing and code quality assurance",
["test", "quality", "validation"], 8)
],
"lara_alesi": [
AgentCapability("medical_analysis", "Medical data analysis and diagnosis",
["medical", "health", "diagnosis", "clinical"], 10),
AgentCapability("data_analysis", "Statistical data analysis and interpretation",
["analysis", "statistics", "data", "interpret"], 9),
AgentCapability("research", "Medical research and literature review",
["research", "study", "literature", "evidence"], 8)
],
"justus_alesi": [
AgentCapability("legal_analysis", "Legal compliance and regulatory analysis",
["legal", "compliance", "regulation", "law"], 10),
AgentCapability("documentation", "Legal documentation and contract review",
["document", "contract", "review", "legal"], 9),
AgentCapability("risk_assessment", "Legal risk assessment and mitigation",
["risk", "assessment", "legal", "mitigation"], 8)
],
"theo_alesi": [
AgentCapability("financial_analysis", "Financial analysis and budgeting",
["finance", "budget", "cost", "investment"], 10),
AgentCapability("market_analysis", "Market analysis and business intelligence",
["market", "business", "intelligence", "analysis"], 9),
AgentCapability("reporting", "Financial reporting and KPI tracking",
["report", "kpi", "tracking", "metrics"], 8)
],
"leon_alesi": [
AgentCapability("system_administration", "System administration and deployment",
["system", "admin", "deploy", "infrastructure"], 10),
AgentCapability("monitoring", "System monitoring and performance tracking",
["monitor", "performance", "system", "tracking"], 9),
AgentCapability("security", "System security and access control",
["security", "access", "control", "protect"], 9)
],
"luna_alesi": [
AgentCapability("coaching", "Team coaching and development",
["coach", "team", "development", "training"], 10),
AgentCapability("process_improvement", "Process optimization and workflow improvement",
["process", "improvement", "workflow", "optimize"], 8),
AgentCapability("communication", "Team communication and collaboration",
["communication", "collaboration", "team"], 9)
]
}
# Active tasks tracking
self.active_tasks: Dict[str, TaskRequest] = {}
self.completed_tasks: Dict[str, TaskResult] = {}
# Agent manager reference (will be injected)
self.agent_manager = None
async def initialize(self):
"""Initialize Redis connection and coordinator"""
try:
self.redis_client = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}")
await self.redis_client.ping()
logger.info(f"✅ Multi-Agent Coordinator initialized with Redis at {self.redis_host}:{self.redis_port}")
return True
except Exception as e:
logger.error(f"❌ Failed to initialize Multi-Agent Coordinator: {e}")
return False
def set_agent_manager(self, agent_manager):
"""Inject agent manager dependency"""
self.agent_manager = agent_manager
async def analyze_intent(self, user_message: str) -> Tuple[str, List[str], str]:
"""
Analyze user intent and determine if multi-agent coordination is needed
Returns: (task_type, required_capabilities, primary_agent)
"""
message_lower = user_message.lower()
# Intent analysis patterns - 🎯 ENHANCED FOR FINANCIAL
intent_patterns = {
"development": ["entwicke", "code", "programmier", "implementier", "software", "app"],
"medical": ["medizinisch", "gesundheit", "diagnose", "patient", "clinical"],
"legal": ["rechtlich", "legal", "compliance", "vertrag", "regulation"],
"financial": ["finanzanwendung", "finanz", "finanziell", "budget", "kosten", "investment", "market", "banking", "payment"], # 🔧 FIXED
"system": ["system", "deploy", "server", "infrastructure", "admin"],
"coordination": ["koordinier", "manage", "plan", "orchestrat", "workflow"],
"analysis": ["analysier", "untersuche", "bewerte", "statistik"]
}
# Multi-agent patterns (require coordination)
multi_agent_patterns = [
"full stack", "complete solution", "end-to-end", "comprehensive",
"multi", "various", "different aspects", "holistic approach"
]
detected_intents = []
for intent, keywords in intent_patterns.items():
if any(keyword in message_lower for keyword in keywords):
detected_intents.append(intent)
# Check if multi-agent coordination is needed
needs_coordination = any(pattern in message_lower for pattern in multi_agent_patterns) or len(detected_intents) > 1
if needs_coordination or "coordination" in detected_intents:
return "multi_agent_task", detected_intents, "jane_alesi"
elif "development" in detected_intents:
return "development_task", ["development"], "john_alesi"
elif "medical" in detected_intents:
return "medical_task", ["medical_analysis"], "lara_alesi"
elif "legal" in detected_intents:
return "legal_task", ["legal_analysis"], "justus_alesi"
elif "financial" in detected_intents:
return "financial_task", ["financial_analysis"], "theo_alesi" # 🔧 THEO HANDLES FINANCE
elif "system" in detected_intents:
return "system_task", ["system_administration"], "leon_alesi"
else:
return "general_task", ["coordination"], "jane_alesi"
def select_agents_for_capabilities(self, required_capabilities: List[str], exclude_agent: str = None) -> List[str]:
"""
Select best agents for required capabilities
"""
agent_scores = {}
for agent_id, capabilities in self.agent_capabilities.items():
if exclude_agent and agent_id == exclude_agent:
continue
total_score = 0
matched_capabilities = 0
for required_cap in required_capabilities:
best_match_score = 0
for capability in capabilities:
# Check keyword matches
keyword_matches = sum(1 for keyword in capability.keywords
if keyword in required_cap.lower())
if keyword_matches > 0:
match_score = keyword_matches * capability.complexity_level
best_match_score = max(best_match_score, match_score)
if best_match_score > 0:
total_score += best_match_score
matched_capabilities += 1
if matched_capabilities > 0:
# Average score weighted by coverage
agent_scores[agent_id] = (total_score / len(required_capabilities)) * (matched_capabilities / len(required_capabilities))
# Return top agents sorted by score
sorted_agents = sorted(agent_scores.items(), key=lambda x: x[1], reverse=True)
return [agent_id for agent_id, score in sorted_agents[:3]] # Top 3 agents
async def create_task(self, task_type: str, description: str, input_data: Dict[str, Any],
priority: TaskPriority = TaskPriority.NORMAL,
parent_task_id: str = None) -> str:
"""Create a new task in the coordination system"""
task_id = str(uuid.uuid4())
task = TaskRequest(
task_id=task_id,
task_type=task_type,
description=description,
input_data=input_data,
priority=priority,
status=TaskStatus.CREATED,
parent_task_id=parent_task_id
)
self.active_tasks[task_id] = task
# Store in Redis for persistence
if self.redis_client:
try:
await self.redis_client.hset("saap:tasks", task_id, task.json())
except Exception as e:
logger.warning(f"⚠️ Failed to store task in Redis: {e}")
logger.info(f"📋 Created task {task_id}: {task_type} - {description}")
return task_id
async def delegate_task(self, task_id: str, agent_id: str) -> bool:
"""Delegate a task to a specific agent"""
if task_id not in self.active_tasks:
logger.error(f"❌ Task {task_id} not found")
return False
task = self.active_tasks[task_id]
task.assigned_agent = agent_id
task.status = TaskStatus.ASSIGNED
# Update Redis
if self.redis_client:
try:
await self.redis_client.hset("saap:tasks", task_id, task.json())
except Exception as e:
logger.warning(f"⚠️ Failed to update task in Redis: {e}")
logger.info(f"👤 Delegated task {task_id} to {agent_id}")
return True
async def execute_task(self, task_id: str) -> TaskResult:
"""Execute a delegated task through the assigned agent"""
if task_id not in self.active_tasks:
raise ValueError(f"Task {task_id} not found")
task = self.active_tasks[task_id]
if not task.assigned_agent:
raise ValueError(f"Task {task_id} has no assigned agent")
start_time = time.time()
task.status = TaskStatus.IN_PROGRESS
try:
# Get agent and execute task
if not self.agent_manager:
raise ValueError("Agent manager not available")
# 🔧 FIX: Don't await synchronous methods
agent = self.agent_manager.get_agent(task.assigned_agent)
if not agent:
raise ValueError(f"Agent {task.assigned_agent} not found")
# Create task-specific prompt
task_prompt = self._create_task_prompt(task)
# 🔧 FIX: Check if send_message is async or sync
if hasattr(self.agent_manager, 'send_message'):
# Try to determine if it's async
send_method = getattr(self.agent_manager, 'send_message')
if asyncio.iscoroutinefunction(send_method):
response = await self.agent_manager.send_message(task.assigned_agent, task_prompt)
else:
response = self.agent_manager.send_message(task.assigned_agent, task_prompt)
else:
# Fallback: Use a generic message
response = f"Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert."
execution_time = time.time() - start_time
result = TaskResult(
task_id=task_id,
agent_id=task.assigned_agent,
status=TaskStatus.COMPLETED,
result={"response": response, "execution_time": execution_time},
execution_time=execution_time,
timestamp=datetime.now()
)
# Move to completed tasks
self.completed_tasks[task_id] = result
del self.active_tasks[task_id]
# Update Redis
if self.redis_client:
try:
await self.redis_client.hset("saap:completed_tasks", task_id, result.json())
await self.redis_client.hdel("saap:tasks", task_id)
except Exception as e:
logger.warning(f"⚠️ Failed to update Redis: {e}")
logger.info(f"✅ Completed task {task_id} in {execution_time:.2f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
error_result = TaskResult(
task_id=task_id,
agent_id=task.assigned_agent,
status=TaskStatus.FAILED,
result={"error": str(e)},
execution_time=execution_time,
timestamp=datetime.now(),
error_message=str(e)
)
self.completed_tasks[task_id] = error_result
del self.active_tasks[task_id]
logger.error(f"❌ Task {task_id} failed: {e}")
return error_result
def _create_task_prompt(self, task: TaskRequest) -> str:
"""Create agent-specific prompt for task execution"""
prompt = f"""
Task ID: {task.task_id}
Task Type: {task.task_type}
Priority: {task.priority}
Description: {task.description}
Input Data: {json.dumps(task.input_data, indent=2)}
Please process this task according to your role and capabilities.
Provide a detailed response with actionable results.
"""
return prompt
async def coordinate_multi_agent_task(self, user_message: str, user_context: Dict[str, Any] = None, provider: str = "auto") -> Dict[str, Any]:
"""
Main coordination method for multi-agent tasks
This is the entry point for complex multi-agent workflows
Args:
user_message: User's message/request
user_context: Additional context data
provider: LLM provider selection ("auto", "colossus", "openrouter")
"""
start_time = time.time()
try:
# Step 1: Analyze intent
task_type, required_capabilities, primary_agent = await self.analyze_intent(user_message)
logger.info(f"🎯 Intent Analysis: {task_type}{primary_agent} (capabilities: {required_capabilities})")
logger.info(f"🔒 Provider: {provider}")
# Step 2: Determine if multi-agent coordination is needed
if task_type == "multi_agent_task" or len(required_capabilities) > 1:
return await self._handle_multi_agent_workflow(user_message, required_capabilities, user_context, provider)
else:
return await self._handle_single_agent_task(user_message, primary_agent, user_context, provider)
except Exception as e:
logger.error(f"❌ Coordination error: {e}")
return {
"success": False,
"error": str(e),
"execution_time": time.time() - start_time
}
async def _handle_single_agent_task(self, message: str, agent_id: str, context: Dict[str, Any], provider: str = "auto") -> Dict[str, Any]:
"""Handle simple single-agent task with provider selection"""
start_time = time.time()
try:
# Create and execute task
task_id = await self.create_task(
task_type="single_agent",
description=message,
input_data={"message": message, "context": context or {}, "provider": provider}
)
await self.delegate_task(task_id, agent_id)
result = await self.execute_task(task_id)
return {
"success": result.status == TaskStatus.COMPLETED,
"task_id": task_id,
"coordinator": agent_id,
"coordinator_response": result.result.get("response", "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert."),
"specialist_response": result.result.get("response", ""),
"execution_time": time.time() - start_time,
"workflow_type": "single_agent",
"coordination_chain": [agent_id],
"processing_time": result.execution_time,
"timestamp": result.timestamp.isoformat()
}
except Exception as e:
logger.error(f"❌ Single agent task failed: {e}")
return {
"success": False,
"coordinator": agent_id,
"coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.",
"specialist_response": "",
"error": str(e),
"execution_time": time.time() - start_time,
"workflow_type": "single_agent",
"coordination_chain": [agent_id],
"processing_time": 0,
"timestamp": datetime.now().isoformat()
}
async def _handle_multi_agent_workflow(self, message: str, capabilities: List[str], context: Dict[str, Any], provider: str = "auto") -> Dict[str, Any]:
"""Handle complex multi-agent workflow with Jane as coordinator and provider selection"""
start_time = time.time()
workflow_steps = []
try:
# Step 1: Jane analyzes and creates coordination plan
coordination_task_id = await self.create_task(
task_type="coordination_analysis",
description=f"Analyze the following request and create a multi-agent coordination plan: {message}",
input_data={
"original_message": message,
"detected_capabilities": capabilities,
"context": context or {},
"provider": provider
},
priority=TaskPriority.HIGH
)
await self.delegate_task(coordination_task_id, "jane_alesi")
coordination_result = await self.execute_task(coordination_task_id)
workflow_steps.append({
"step": "coordination_analysis",
"agent": "jane_alesi",
"result": coordination_result.result,
"execution_time": coordination_result.execution_time
})
# Step 2: Select and coordinate specialist agents
selected_agents = self.select_agents_for_capabilities(capabilities, exclude_agent="jane_alesi")
specialist_results = []
for agent_id in selected_agents[:2]: # Limit to 2 specialists for now
specialist_task_id = await self.create_task(
task_type="specialist_analysis",
description=f"Provide specialist analysis for: {message}",
input_data={
"original_message": message,
"coordination_plan": coordination_result.result,
"context": context or {},
"provider": provider
},
parent_task_id=coordination_task_id
)
await self.delegate_task(specialist_task_id, agent_id)
specialist_result = await self.execute_task(specialist_task_id)
specialist_results.append(specialist_result)
workflow_steps.append({
"step": "specialist_analysis",
"agent": agent_id,
"result": specialist_result.result,
"execution_time": specialist_result.execution_time
})
# Step 3: Jane synthesizes all results
synthesis_task_id = await self.create_task(
task_type="result_synthesis",
description="Synthesize specialist results into comprehensive response",
input_data={
"original_message": message,
"coordination_result": coordination_result.result,
"specialist_results": [r.result for r in specialist_results],
"context": context or {},
"provider": provider
},
priority=TaskPriority.HIGH,
parent_task_id=coordination_task_id
)
await self.delegate_task(synthesis_task_id, "jane_alesi")
synthesis_result = await self.execute_task(synthesis_task_id)
workflow_steps.append({
"step": "result_synthesis",
"agent": "jane_alesi",
"result": synthesis_result.result,
"execution_time": synthesis_result.execution_time
})
total_execution_time = time.time() - start_time
return {
"success": True,
"workflow_type": "multi_agent",
"coordinator": "jane_alesi",
"specialists": selected_agents[:2],
"workflow_steps": workflow_steps,
"coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.",
"specialist_response": synthesis_result.result.get("response", ""),
"final_response": synthesis_result.result.get("response", ""),
"coordination_chain": ["jane_alesi"] + selected_agents[:2],
"processing_time": total_execution_time,
"timestamp": datetime.now().isoformat(),
"task_count": len(workflow_steps)
}
except Exception as e:
logger.error(f"❌ Multi-agent workflow failed: {e}")
return {
"success": False,
"workflow_type": "multi_agent",
"coordinator": "jane_alesi",
"coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.",
"specialist_response": "",
"error": str(e),
"coordination_chain": ["jane_alesi"],
"processing_time": time.time() - start_time,
"timestamp": datetime.now().isoformat()
}
async def get_agent_workload(self, agent_id: str) -> Dict[str, Any]:
"""Get current workload statistics for an agent"""
active_count = sum(1 for task in self.active_tasks.values()
if task.assigned_agent == agent_id)
completed_count = sum(1 for result in self.completed_tasks.values()
if result.agent_id == agent_id)
return {
"agent_id": agent_id,
"active_tasks": active_count,
"completed_tasks": completed_count,
"capabilities": [cap.name for cap in self.agent_capabilities.get(agent_id, [])]
}
async def get_coordination_stats(self) -> Dict[str, Any]:
"""Get overall coordination statistics"""
return {
"active_tasks": len(self.active_tasks),
"completed_tasks": len(self.completed_tasks),
"available_agents": len(self.agent_capabilities),
"agent_workloads": {
agent_id: await self.get_agent_workload(agent_id)
for agent_id in self.agent_capabilities.keys()
}
}
async def cleanup(self):
"""Cleanup Redis connections"""
if self.redis_client:
await self.redis_client.close()
# Global coordinator instance
coordinator_instance = None
async def get_coordinator() -> MultiAgentCoordinator:
"""Get global coordinator instance"""
global coordinator_instance
if coordinator_instance is None:
coordinator_instance = MultiAgentCoordinator()
await coordinator_instance.initialize()
return coordinator_instance