Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Multi-Agent Coordinator Service for SAAP Platform | |
| Enables autonomous agent-to-agent communication and task delegation | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from enum import Enum | |
| from dataclasses import dataclass | |
| import redis.asyncio as aioredis | |
| from pydantic import BaseModel | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TaskStatus(str, Enum): | |
| CREATED = "created" | |
| ASSIGNED = "assigned" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class TaskPriority(str, Enum): | |
| LOW = "low" | |
| NORMAL = "normal" | |
| HIGH = "high" | |
| URGENT = "urgent" | |
| class AgentCapability: | |
| """Agent capability definition for intelligent task matching""" | |
| name: str | |
| description: str | |
| keywords: List[str] | |
| complexity_level: int # 1-10 scale | |
| class TaskRequest(BaseModel): | |
| task_id: str | |
| task_type: str | |
| description: str | |
| input_data: Dict[str, Any] | |
| priority: TaskPriority = TaskPriority.NORMAL | |
| status: TaskStatus = TaskStatus.CREATED | |
| assigned_agent: Optional[str] = None | |
| parent_task_id: Optional[str] = None | |
| max_execution_time: int = 300 # seconds | |
| class TaskResult(BaseModel): | |
| task_id: str | |
| agent_id: str | |
| status: TaskStatus | |
| result: Dict[str, Any] | |
| execution_time: float | |
| timestamp: datetime | |
| error_message: Optional[str] = None | |
| class MultiAgentCoordinator: | |
| """ | |
| Multi-Agent Coordinator for autonomous task delegation and workflow orchestration | |
| Jane Alesi acts as the master coordinator with intelligent agent selection | |
| """ | |
| def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): | |
| self.redis_host = redis_host | |
| self.redis_port = redis_port | |
| self.redis_client = None | |
| # Agent capabilities database | |
| self.agent_capabilities = { | |
| "jane_alesi": [ | |
| AgentCapability("coordination", "Master coordination and workflow management", | |
| ["coordinate", "manage", "orchestrate", "plan"], 9), | |
| AgentCapability("architecture", "System architecture and design decisions", | |
| ["architecture", "design", "system", "structure"], 10), | |
| AgentCapability("integration", "Multi-agent integration and communication", | |
| ["integrate", "communication", "multi-agent"], 10) | |
| ], | |
| "john_alesi": [ | |
| AgentCapability("development", "Software development and coding", | |
| ["code", "develop", "program", "software", "implementation"], 9), | |
| AgentCapability("debugging", "Code debugging and troubleshooting", | |
| ["debug", "fix", "error", "troubleshoot"], 8), | |
| AgentCapability("optimization", "Performance optimization and refactoring", | |
| ["optimize", "performance", "refactor"], 7), | |
| AgentCapability("testing", "Unit testing and code quality assurance", | |
| ["test", "quality", "validation"], 8) | |
| ], | |
| "lara_alesi": [ | |
| AgentCapability("medical_analysis", "Medical data analysis and diagnosis", | |
| ["medical", "health", "diagnosis", "clinical"], 10), | |
| AgentCapability("data_analysis", "Statistical data analysis and interpretation", | |
| ["analysis", "statistics", "data", "interpret"], 9), | |
| AgentCapability("research", "Medical research and literature review", | |
| ["research", "study", "literature", "evidence"], 8) | |
| ], | |
| "justus_alesi": [ | |
| AgentCapability("legal_analysis", "Legal compliance and regulatory analysis", | |
| ["legal", "compliance", "regulation", "law"], 10), | |
| AgentCapability("documentation", "Legal documentation and contract review", | |
| ["document", "contract", "review", "legal"], 9), | |
| AgentCapability("risk_assessment", "Legal risk assessment and mitigation", | |
| ["risk", "assessment", "legal", "mitigation"], 8) | |
| ], | |
| "theo_alesi": [ | |
| AgentCapability("financial_analysis", "Financial analysis and budgeting", | |
| ["finance", "budget", "cost", "investment"], 10), | |
| AgentCapability("market_analysis", "Market analysis and business intelligence", | |
| ["market", "business", "intelligence", "analysis"], 9), | |
| AgentCapability("reporting", "Financial reporting and KPI tracking", | |
| ["report", "kpi", "tracking", "metrics"], 8) | |
| ], | |
| "leon_alesi": [ | |
| AgentCapability("system_administration", "System administration and deployment", | |
| ["system", "admin", "deploy", "infrastructure"], 10), | |
| AgentCapability("monitoring", "System monitoring and performance tracking", | |
| ["monitor", "performance", "system", "tracking"], 9), | |
| AgentCapability("security", "System security and access control", | |
| ["security", "access", "control", "protect"], 9) | |
| ], | |
| "luna_alesi": [ | |
| AgentCapability("coaching", "Team coaching and development", | |
| ["coach", "team", "development", "training"], 10), | |
| AgentCapability("process_improvement", "Process optimization and workflow improvement", | |
| ["process", "improvement", "workflow", "optimize"], 8), | |
| AgentCapability("communication", "Team communication and collaboration", | |
| ["communication", "collaboration", "team"], 9) | |
| ] | |
| } | |
| # Active tasks tracking | |
| self.active_tasks: Dict[str, TaskRequest] = {} | |
| self.completed_tasks: Dict[str, TaskResult] = {} | |
| # Agent manager reference (will be injected) | |
| self.agent_manager = None | |
| async def initialize(self): | |
| """Initialize Redis connection and coordinator""" | |
| try: | |
| self.redis_client = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}") | |
| await self.redis_client.ping() | |
| logger.info(f"✅ Multi-Agent Coordinator initialized with Redis at {self.redis_host}:{self.redis_port}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to initialize Multi-Agent Coordinator: {e}") | |
| return False | |
| def set_agent_manager(self, agent_manager): | |
| """Inject agent manager dependency""" | |
| self.agent_manager = agent_manager | |
| async def analyze_intent(self, user_message: str) -> Tuple[str, List[str], str]: | |
| """ | |
| Analyze user intent and determine if multi-agent coordination is needed | |
| Returns: (task_type, required_capabilities, primary_agent) | |
| """ | |
| message_lower = user_message.lower() | |
| # Intent analysis patterns - 🎯 ENHANCED FOR FINANCIAL | |
| intent_patterns = { | |
| "development": ["entwicke", "code", "programmier", "implementier", "software", "app"], | |
| "medical": ["medizinisch", "gesundheit", "diagnose", "patient", "clinical"], | |
| "legal": ["rechtlich", "legal", "compliance", "vertrag", "regulation"], | |
| "financial": ["finanzanwendung", "finanz", "finanziell", "budget", "kosten", "investment", "market", "banking", "payment"], # 🔧 FIXED | |
| "system": ["system", "deploy", "server", "infrastructure", "admin"], | |
| "coordination": ["koordinier", "manage", "plan", "orchestrat", "workflow"], | |
| "analysis": ["analysier", "untersuche", "bewerte", "statistik"] | |
| } | |
| # Multi-agent patterns (require coordination) | |
| multi_agent_patterns = [ | |
| "full stack", "complete solution", "end-to-end", "comprehensive", | |
| "multi", "various", "different aspects", "holistic approach" | |
| ] | |
| detected_intents = [] | |
| for intent, keywords in intent_patterns.items(): | |
| if any(keyword in message_lower for keyword in keywords): | |
| detected_intents.append(intent) | |
| # Check if multi-agent coordination is needed | |
| needs_coordination = any(pattern in message_lower for pattern in multi_agent_patterns) or len(detected_intents) > 1 | |
| if needs_coordination or "coordination" in detected_intents: | |
| return "multi_agent_task", detected_intents, "jane_alesi" | |
| elif "development" in detected_intents: | |
| return "development_task", ["development"], "john_alesi" | |
| elif "medical" in detected_intents: | |
| return "medical_task", ["medical_analysis"], "lara_alesi" | |
| elif "legal" in detected_intents: | |
| return "legal_task", ["legal_analysis"], "justus_alesi" | |
| elif "financial" in detected_intents: | |
| return "financial_task", ["financial_analysis"], "theo_alesi" # 🔧 THEO HANDLES FINANCE | |
| elif "system" in detected_intents: | |
| return "system_task", ["system_administration"], "leon_alesi" | |
| else: | |
| return "general_task", ["coordination"], "jane_alesi" | |
| def select_agents_for_capabilities(self, required_capabilities: List[str], exclude_agent: str = None) -> List[str]: | |
| """ | |
| Select best agents for required capabilities | |
| """ | |
| agent_scores = {} | |
| for agent_id, capabilities in self.agent_capabilities.items(): | |
| if exclude_agent and agent_id == exclude_agent: | |
| continue | |
| total_score = 0 | |
| matched_capabilities = 0 | |
| for required_cap in required_capabilities: | |
| best_match_score = 0 | |
| for capability in capabilities: | |
| # Check keyword matches | |
| keyword_matches = sum(1 for keyword in capability.keywords | |
| if keyword in required_cap.lower()) | |
| if keyword_matches > 0: | |
| match_score = keyword_matches * capability.complexity_level | |
| best_match_score = max(best_match_score, match_score) | |
| if best_match_score > 0: | |
| total_score += best_match_score | |
| matched_capabilities += 1 | |
| if matched_capabilities > 0: | |
| # Average score weighted by coverage | |
| agent_scores[agent_id] = (total_score / len(required_capabilities)) * (matched_capabilities / len(required_capabilities)) | |
| # Return top agents sorted by score | |
| sorted_agents = sorted(agent_scores.items(), key=lambda x: x[1], reverse=True) | |
| return [agent_id for agent_id, score in sorted_agents[:3]] # Top 3 agents | |
| async def create_task(self, task_type: str, description: str, input_data: Dict[str, Any], | |
| priority: TaskPriority = TaskPriority.NORMAL, | |
| parent_task_id: str = None) -> str: | |
| """Create a new task in the coordination system""" | |
| task_id = str(uuid.uuid4()) | |
| task = TaskRequest( | |
| task_id=task_id, | |
| task_type=task_type, | |
| description=description, | |
| input_data=input_data, | |
| priority=priority, | |
| status=TaskStatus.CREATED, | |
| parent_task_id=parent_task_id | |
| ) | |
| self.active_tasks[task_id] = task | |
| # Store in Redis for persistence | |
| if self.redis_client: | |
| try: | |
| await self.redis_client.hset("saap:tasks", task_id, task.json()) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to store task in Redis: {e}") | |
| logger.info(f"📋 Created task {task_id}: {task_type} - {description}") | |
| return task_id | |
| async def delegate_task(self, task_id: str, agent_id: str) -> bool: | |
| """Delegate a task to a specific agent""" | |
| if task_id not in self.active_tasks: | |
| logger.error(f"❌ Task {task_id} not found") | |
| return False | |
| task = self.active_tasks[task_id] | |
| task.assigned_agent = agent_id | |
| task.status = TaskStatus.ASSIGNED | |
| # Update Redis | |
| if self.redis_client: | |
| try: | |
| await self.redis_client.hset("saap:tasks", task_id, task.json()) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to update task in Redis: {e}") | |
| logger.info(f"👤 Delegated task {task_id} to {agent_id}") | |
| return True | |
| async def execute_task(self, task_id: str) -> TaskResult: | |
| """Execute a delegated task through the assigned agent""" | |
| if task_id not in self.active_tasks: | |
| raise ValueError(f"Task {task_id} not found") | |
| task = self.active_tasks[task_id] | |
| if not task.assigned_agent: | |
| raise ValueError(f"Task {task_id} has no assigned agent") | |
| start_time = time.time() | |
| task.status = TaskStatus.IN_PROGRESS | |
| try: | |
| # Get agent and execute task | |
| if not self.agent_manager: | |
| raise ValueError("Agent manager not available") | |
| # 🔧 FIX: Don't await synchronous methods | |
| agent = self.agent_manager.get_agent(task.assigned_agent) | |
| if not agent: | |
| raise ValueError(f"Agent {task.assigned_agent} not found") | |
| # Create task-specific prompt | |
| task_prompt = self._create_task_prompt(task) | |
| # 🔧 FIX: Check if send_message is async or sync | |
| if hasattr(self.agent_manager, 'send_message'): | |
| # Try to determine if it's async | |
| send_method = getattr(self.agent_manager, 'send_message') | |
| if asyncio.iscoroutinefunction(send_method): | |
| response = await self.agent_manager.send_message(task.assigned_agent, task_prompt) | |
| else: | |
| response = self.agent_manager.send_message(task.assigned_agent, task_prompt) | |
| else: | |
| # Fallback: Use a generic message | |
| response = f"Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert." | |
| execution_time = time.time() - start_time | |
| result = TaskResult( | |
| task_id=task_id, | |
| agent_id=task.assigned_agent, | |
| status=TaskStatus.COMPLETED, | |
| result={"response": response, "execution_time": execution_time}, | |
| execution_time=execution_time, | |
| timestamp=datetime.now() | |
| ) | |
| # Move to completed tasks | |
| self.completed_tasks[task_id] = result | |
| del self.active_tasks[task_id] | |
| # Update Redis | |
| if self.redis_client: | |
| try: | |
| await self.redis_client.hset("saap:completed_tasks", task_id, result.json()) | |
| await self.redis_client.hdel("saap:tasks", task_id) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to update Redis: {e}") | |
| logger.info(f"✅ Completed task {task_id} in {execution_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| error_result = TaskResult( | |
| task_id=task_id, | |
| agent_id=task.assigned_agent, | |
| status=TaskStatus.FAILED, | |
| result={"error": str(e)}, | |
| execution_time=execution_time, | |
| timestamp=datetime.now(), | |
| error_message=str(e) | |
| ) | |
| self.completed_tasks[task_id] = error_result | |
| del self.active_tasks[task_id] | |
| logger.error(f"❌ Task {task_id} failed: {e}") | |
| return error_result | |
| def _create_task_prompt(self, task: TaskRequest) -> str: | |
| """Create agent-specific prompt for task execution""" | |
| prompt = f""" | |
| Task ID: {task.task_id} | |
| Task Type: {task.task_type} | |
| Priority: {task.priority} | |
| Description: {task.description} | |
| Input Data: {json.dumps(task.input_data, indent=2)} | |
| Please process this task according to your role and capabilities. | |
| Provide a detailed response with actionable results. | |
| """ | |
| return prompt | |
| async def coordinate_multi_agent_task(self, user_message: str, user_context: Dict[str, Any] = None, provider: str = "auto") -> Dict[str, Any]: | |
| """ | |
| Main coordination method for multi-agent tasks | |
| This is the entry point for complex multi-agent workflows | |
| Args: | |
| user_message: User's message/request | |
| user_context: Additional context data | |
| provider: LLM provider selection ("auto", "colossus", "openrouter") | |
| """ | |
| start_time = time.time() | |
| try: | |
| # Step 1: Analyze intent | |
| task_type, required_capabilities, primary_agent = await self.analyze_intent(user_message) | |
| logger.info(f"🎯 Intent Analysis: {task_type} → {primary_agent} (capabilities: {required_capabilities})") | |
| logger.info(f"🔒 Provider: {provider}") | |
| # Step 2: Determine if multi-agent coordination is needed | |
| if task_type == "multi_agent_task" or len(required_capabilities) > 1: | |
| return await self._handle_multi_agent_workflow(user_message, required_capabilities, user_context, provider) | |
| else: | |
| return await self._handle_single_agent_task(user_message, primary_agent, user_context, provider) | |
| except Exception as e: | |
| logger.error(f"❌ Coordination error: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "execution_time": time.time() - start_time | |
| } | |
| async def _handle_single_agent_task(self, message: str, agent_id: str, context: Dict[str, Any], provider: str = "auto") -> Dict[str, Any]: | |
| """Handle simple single-agent task with provider selection""" | |
| start_time = time.time() | |
| try: | |
| # Create and execute task | |
| task_id = await self.create_task( | |
| task_type="single_agent", | |
| description=message, | |
| input_data={"message": message, "context": context or {}, "provider": provider} | |
| ) | |
| await self.delegate_task(task_id, agent_id) | |
| result = await self.execute_task(task_id) | |
| return { | |
| "success": result.status == TaskStatus.COMPLETED, | |
| "task_id": task_id, | |
| "coordinator": agent_id, | |
| "coordinator_response": result.result.get("response", "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert."), | |
| "specialist_response": result.result.get("response", ""), | |
| "execution_time": time.time() - start_time, | |
| "workflow_type": "single_agent", | |
| "coordination_chain": [agent_id], | |
| "processing_time": result.execution_time, | |
| "timestamp": result.timestamp.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Single agent task failed: {e}") | |
| return { | |
| "success": False, | |
| "coordinator": agent_id, | |
| "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", | |
| "specialist_response": "", | |
| "error": str(e), | |
| "execution_time": time.time() - start_time, | |
| "workflow_type": "single_agent", | |
| "coordination_chain": [agent_id], | |
| "processing_time": 0, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def _handle_multi_agent_workflow(self, message: str, capabilities: List[str], context: Dict[str, Any], provider: str = "auto") -> Dict[str, Any]: | |
| """Handle complex multi-agent workflow with Jane as coordinator and provider selection""" | |
| start_time = time.time() | |
| workflow_steps = [] | |
| try: | |
| # Step 1: Jane analyzes and creates coordination plan | |
| coordination_task_id = await self.create_task( | |
| task_type="coordination_analysis", | |
| description=f"Analyze the following request and create a multi-agent coordination plan: {message}", | |
| input_data={ | |
| "original_message": message, | |
| "detected_capabilities": capabilities, | |
| "context": context or {}, | |
| "provider": provider | |
| }, | |
| priority=TaskPriority.HIGH | |
| ) | |
| await self.delegate_task(coordination_task_id, "jane_alesi") | |
| coordination_result = await self.execute_task(coordination_task_id) | |
| workflow_steps.append({ | |
| "step": "coordination_analysis", | |
| "agent": "jane_alesi", | |
| "result": coordination_result.result, | |
| "execution_time": coordination_result.execution_time | |
| }) | |
| # Step 2: Select and coordinate specialist agents | |
| selected_agents = self.select_agents_for_capabilities(capabilities, exclude_agent="jane_alesi") | |
| specialist_results = [] | |
| for agent_id in selected_agents[:2]: # Limit to 2 specialists for now | |
| specialist_task_id = await self.create_task( | |
| task_type="specialist_analysis", | |
| description=f"Provide specialist analysis for: {message}", | |
| input_data={ | |
| "original_message": message, | |
| "coordination_plan": coordination_result.result, | |
| "context": context or {}, | |
| "provider": provider | |
| }, | |
| parent_task_id=coordination_task_id | |
| ) | |
| await self.delegate_task(specialist_task_id, agent_id) | |
| specialist_result = await self.execute_task(specialist_task_id) | |
| specialist_results.append(specialist_result) | |
| workflow_steps.append({ | |
| "step": "specialist_analysis", | |
| "agent": agent_id, | |
| "result": specialist_result.result, | |
| "execution_time": specialist_result.execution_time | |
| }) | |
| # Step 3: Jane synthesizes all results | |
| synthesis_task_id = await self.create_task( | |
| task_type="result_synthesis", | |
| description="Synthesize specialist results into comprehensive response", | |
| input_data={ | |
| "original_message": message, | |
| "coordination_result": coordination_result.result, | |
| "specialist_results": [r.result for r in specialist_results], | |
| "context": context or {}, | |
| "provider": provider | |
| }, | |
| priority=TaskPriority.HIGH, | |
| parent_task_id=coordination_task_id | |
| ) | |
| await self.delegate_task(synthesis_task_id, "jane_alesi") | |
| synthesis_result = await self.execute_task(synthesis_task_id) | |
| workflow_steps.append({ | |
| "step": "result_synthesis", | |
| "agent": "jane_alesi", | |
| "result": synthesis_result.result, | |
| "execution_time": synthesis_result.execution_time | |
| }) | |
| total_execution_time = time.time() - start_time | |
| return { | |
| "success": True, | |
| "workflow_type": "multi_agent", | |
| "coordinator": "jane_alesi", | |
| "specialists": selected_agents[:2], | |
| "workflow_steps": workflow_steps, | |
| "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", | |
| "specialist_response": synthesis_result.result.get("response", ""), | |
| "final_response": synthesis_result.result.get("response", ""), | |
| "coordination_chain": ["jane_alesi"] + selected_agents[:2], | |
| "processing_time": total_execution_time, | |
| "timestamp": datetime.now().isoformat(), | |
| "task_count": len(workflow_steps) | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Multi-agent workflow failed: {e}") | |
| return { | |
| "success": False, | |
| "workflow_type": "multi_agent", | |
| "coordinator": "jane_alesi", | |
| "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", | |
| "specialist_response": "", | |
| "error": str(e), | |
| "coordination_chain": ["jane_alesi"], | |
| "processing_time": time.time() - start_time, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def get_agent_workload(self, agent_id: str) -> Dict[str, Any]: | |
| """Get current workload statistics for an agent""" | |
| active_count = sum(1 for task in self.active_tasks.values() | |
| if task.assigned_agent == agent_id) | |
| completed_count = sum(1 for result in self.completed_tasks.values() | |
| if result.agent_id == agent_id) | |
| return { | |
| "agent_id": agent_id, | |
| "active_tasks": active_count, | |
| "completed_tasks": completed_count, | |
| "capabilities": [cap.name for cap in self.agent_capabilities.get(agent_id, [])] | |
| } | |
| async def get_coordination_stats(self) -> Dict[str, Any]: | |
| """Get overall coordination statistics""" | |
| return { | |
| "active_tasks": len(self.active_tasks), | |
| "completed_tasks": len(self.completed_tasks), | |
| "available_agents": len(self.agent_capabilities), | |
| "agent_workloads": { | |
| agent_id: await self.get_agent_workload(agent_id) | |
| for agent_id in self.agent_capabilities.keys() | |
| } | |
| } | |
| async def cleanup(self): | |
| """Cleanup Redis connections""" | |
| if self.redis_client: | |
| await self.redis_client.close() | |
| # Global coordinator instance | |
| coordinator_instance = None | |
| async def get_coordinator() -> MultiAgentCoordinator: | |
| """Get global coordinator instance""" | |
| global coordinator_instance | |
| if coordinator_instance is None: | |
| coordinator_instance = MultiAgentCoordinator() | |
| await coordinator_instance.initialize() | |
| return coordinator_instance | |