#!/usr/bin/env python3 """ Multi-Agent Coordinator Service for SAAP Platform Enables autonomous agent-to-agent communication and task delegation """ import asyncio import json import logging import time import uuid from datetime import datetime from typing import Dict, List, Optional, Any, Tuple from enum import Enum from dataclasses import dataclass import redis.asyncio as aioredis from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TaskStatus(str, Enum): CREATED = "created" ASSIGNED = "assigned" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" class TaskPriority(str, Enum): LOW = "low" NORMAL = "normal" HIGH = "high" URGENT = "urgent" @dataclass class AgentCapability: """Agent capability definition for intelligent task matching""" name: str description: str keywords: List[str] complexity_level: int # 1-10 scale class TaskRequest(BaseModel): task_id: str task_type: str description: str input_data: Dict[str, Any] priority: TaskPriority = TaskPriority.NORMAL status: TaskStatus = TaskStatus.CREATED assigned_agent: Optional[str] = None parent_task_id: Optional[str] = None max_execution_time: int = 300 # seconds class TaskResult(BaseModel): task_id: str agent_id: str status: TaskStatus result: Dict[str, Any] execution_time: float timestamp: datetime error_message: Optional[str] = None class MultiAgentCoordinator: """ Multi-Agent Coordinator for autonomous task delegation and workflow orchestration Jane Alesi acts as the master coordinator with intelligent agent selection """ def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): self.redis_host = redis_host self.redis_port = redis_port self.redis_client = None # Agent capabilities database self.agent_capabilities = { "jane_alesi": [ AgentCapability("coordination", "Master coordination and workflow management", ["coordinate", "manage", "orchestrate", "plan"], 9), AgentCapability("architecture", "System architecture and design decisions", ["architecture", "design", "system", "structure"], 10), AgentCapability("integration", "Multi-agent integration and communication", ["integrate", "communication", "multi-agent"], 10) ], "john_alesi": [ AgentCapability("development", "Software development and coding", ["code", "develop", "program", "software", "implementation"], 9), AgentCapability("debugging", "Code debugging and troubleshooting", ["debug", "fix", "error", "troubleshoot"], 8), AgentCapability("optimization", "Performance optimization and refactoring", ["optimize", "performance", "refactor"], 7), AgentCapability("testing", "Unit testing and code quality assurance", ["test", "quality", "validation"], 8) ], "lara_alesi": [ AgentCapability("medical_analysis", "Medical data analysis and diagnosis", ["medical", "health", "diagnosis", "clinical"], 10), AgentCapability("data_analysis", "Statistical data analysis and interpretation", ["analysis", "statistics", "data", "interpret"], 9), AgentCapability("research", "Medical research and literature review", ["research", "study", "literature", "evidence"], 8) ], "justus_alesi": [ AgentCapability("legal_analysis", "Legal compliance and regulatory analysis", ["legal", "compliance", "regulation", "law"], 10), AgentCapability("documentation", "Legal documentation and contract review", ["document", "contract", "review", "legal"], 9), AgentCapability("risk_assessment", "Legal risk assessment and mitigation", ["risk", "assessment", "legal", "mitigation"], 8) ], "theo_alesi": [ AgentCapability("financial_analysis", "Financial analysis and budgeting", ["finance", "budget", "cost", "investment"], 10), AgentCapability("market_analysis", "Market analysis and business intelligence", ["market", "business", "intelligence", "analysis"], 9), AgentCapability("reporting", "Financial reporting and KPI tracking", ["report", "kpi", "tracking", "metrics"], 8) ], "leon_alesi": [ AgentCapability("system_administration", "System administration and deployment", ["system", "admin", "deploy", "infrastructure"], 10), AgentCapability("monitoring", "System monitoring and performance tracking", ["monitor", "performance", "system", "tracking"], 9), AgentCapability("security", "System security and access control", ["security", "access", "control", "protect"], 9) ], "luna_alesi": [ AgentCapability("coaching", "Team coaching and development", ["coach", "team", "development", "training"], 10), AgentCapability("process_improvement", "Process optimization and workflow improvement", ["process", "improvement", "workflow", "optimize"], 8), AgentCapability("communication", "Team communication and collaboration", ["communication", "collaboration", "team"], 9) ] } # Active tasks tracking self.active_tasks: Dict[str, TaskRequest] = {} self.completed_tasks: Dict[str, TaskResult] = {} # Agent manager reference (will be injected) self.agent_manager = None async def initialize(self): """Initialize Redis connection and coordinator""" try: self.redis_client = aioredis.from_url(f"redis://{self.redis_host}:{self.redis_port}") await self.redis_client.ping() logger.info(f"✅ Multi-Agent Coordinator initialized with Redis at {self.redis_host}:{self.redis_port}") return True except Exception as e: logger.error(f"❌ Failed to initialize Multi-Agent Coordinator: {e}") return False def set_agent_manager(self, agent_manager): """Inject agent manager dependency""" self.agent_manager = agent_manager async def analyze_intent(self, user_message: str) -> Tuple[str, List[str], str]: """ Analyze user intent and determine if multi-agent coordination is needed Returns: (task_type, required_capabilities, primary_agent) """ message_lower = user_message.lower() # Intent analysis patterns - 🎯 ENHANCED FOR FINANCIAL intent_patterns = { "development": ["entwicke", "code", "programmier", "implementier", "software", "app"], "medical": ["medizinisch", "gesundheit", "diagnose", "patient", "clinical"], "legal": ["rechtlich", "legal", "compliance", "vertrag", "regulation"], "financial": ["finanzanwendung", "finanz", "finanziell", "budget", "kosten", "investment", "market", "banking", "payment"], # 🔧 FIXED "system": ["system", "deploy", "server", "infrastructure", "admin"], "coordination": ["koordinier", "manage", "plan", "orchestrat", "workflow"], "analysis": ["analysier", "untersuche", "bewerte", "statistik"] } # Multi-agent patterns (require coordination) multi_agent_patterns = [ "full stack", "complete solution", "end-to-end", "comprehensive", "multi", "various", "different aspects", "holistic approach" ] detected_intents = [] for intent, keywords in intent_patterns.items(): if any(keyword in message_lower for keyword in keywords): detected_intents.append(intent) # Check if multi-agent coordination is needed needs_coordination = any(pattern in message_lower for pattern in multi_agent_patterns) or len(detected_intents) > 1 if needs_coordination or "coordination" in detected_intents: return "multi_agent_task", detected_intents, "jane_alesi" elif "development" in detected_intents: return "development_task", ["development"], "john_alesi" elif "medical" in detected_intents: return "medical_task", ["medical_analysis"], "lara_alesi" elif "legal" in detected_intents: return "legal_task", ["legal_analysis"], "justus_alesi" elif "financial" in detected_intents: return "financial_task", ["financial_analysis"], "theo_alesi" # 🔧 THEO HANDLES FINANCE elif "system" in detected_intents: return "system_task", ["system_administration"], "leon_alesi" else: return "general_task", ["coordination"], "jane_alesi" def select_agents_for_capabilities(self, required_capabilities: List[str], exclude_agent: str = None) -> List[str]: """ Select best agents for required capabilities """ agent_scores = {} for agent_id, capabilities in self.agent_capabilities.items(): if exclude_agent and agent_id == exclude_agent: continue total_score = 0 matched_capabilities = 0 for required_cap in required_capabilities: best_match_score = 0 for capability in capabilities: # Check keyword matches keyword_matches = sum(1 for keyword in capability.keywords if keyword in required_cap.lower()) if keyword_matches > 0: match_score = keyword_matches * capability.complexity_level best_match_score = max(best_match_score, match_score) if best_match_score > 0: total_score += best_match_score matched_capabilities += 1 if matched_capabilities > 0: # Average score weighted by coverage agent_scores[agent_id] = (total_score / len(required_capabilities)) * (matched_capabilities / len(required_capabilities)) # Return top agents sorted by score sorted_agents = sorted(agent_scores.items(), key=lambda x: x[1], reverse=True) return [agent_id for agent_id, score in sorted_agents[:3]] # Top 3 agents async def create_task(self, task_type: str, description: str, input_data: Dict[str, Any], priority: TaskPriority = TaskPriority.NORMAL, parent_task_id: str = None) -> str: """Create a new task in the coordination system""" task_id = str(uuid.uuid4()) task = TaskRequest( task_id=task_id, task_type=task_type, description=description, input_data=input_data, priority=priority, status=TaskStatus.CREATED, parent_task_id=parent_task_id ) self.active_tasks[task_id] = task # Store in Redis for persistence if self.redis_client: try: await self.redis_client.hset("saap:tasks", task_id, task.json()) except Exception as e: logger.warning(f"⚠️ Failed to store task in Redis: {e}") logger.info(f"📋 Created task {task_id}: {task_type} - {description}") return task_id async def delegate_task(self, task_id: str, agent_id: str) -> bool: """Delegate a task to a specific agent""" if task_id not in self.active_tasks: logger.error(f"❌ Task {task_id} not found") return False task = self.active_tasks[task_id] task.assigned_agent = agent_id task.status = TaskStatus.ASSIGNED # Update Redis if self.redis_client: try: await self.redis_client.hset("saap:tasks", task_id, task.json()) except Exception as e: logger.warning(f"⚠️ Failed to update task in Redis: {e}") logger.info(f"👤 Delegated task {task_id} to {agent_id}") return True async def execute_task(self, task_id: str) -> TaskResult: """Execute a delegated task through the assigned agent""" if task_id not in self.active_tasks: raise ValueError(f"Task {task_id} not found") task = self.active_tasks[task_id] if not task.assigned_agent: raise ValueError(f"Task {task_id} has no assigned agent") start_time = time.time() task.status = TaskStatus.IN_PROGRESS try: # Get agent and execute task if not self.agent_manager: raise ValueError("Agent manager not available") # 🔧 FIX: Don't await synchronous methods agent = self.agent_manager.get_agent(task.assigned_agent) if not agent: raise ValueError(f"Agent {task.assigned_agent} not found") # Create task-specific prompt task_prompt = self._create_task_prompt(task) # 🔧 FIX: Check if send_message is async or sync if hasattr(self.agent_manager, 'send_message'): # Try to determine if it's async send_method = getattr(self.agent_manager, 'send_message') if asyncio.iscoroutinefunction(send_method): response = await self.agent_manager.send_message(task.assigned_agent, task_prompt) else: response = self.agent_manager.send_message(task.assigned_agent, task_prompt) else: # Fallback: Use a generic message response = f"Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert." execution_time = time.time() - start_time result = TaskResult( task_id=task_id, agent_id=task.assigned_agent, status=TaskStatus.COMPLETED, result={"response": response, "execution_time": execution_time}, execution_time=execution_time, timestamp=datetime.now() ) # Move to completed tasks self.completed_tasks[task_id] = result del self.active_tasks[task_id] # Update Redis if self.redis_client: try: await self.redis_client.hset("saap:completed_tasks", task_id, result.json()) await self.redis_client.hdel("saap:tasks", task_id) except Exception as e: logger.warning(f"⚠️ Failed to update Redis: {e}") logger.info(f"✅ Completed task {task_id} in {execution_time:.2f}s") return result except Exception as e: execution_time = time.time() - start_time error_result = TaskResult( task_id=task_id, agent_id=task.assigned_agent, status=TaskStatus.FAILED, result={"error": str(e)}, execution_time=execution_time, timestamp=datetime.now(), error_message=str(e) ) self.completed_tasks[task_id] = error_result del self.active_tasks[task_id] logger.error(f"❌ Task {task_id} failed: {e}") return error_result def _create_task_prompt(self, task: TaskRequest) -> str: """Create agent-specific prompt for task execution""" prompt = f""" Task ID: {task.task_id} Task Type: {task.task_type} Priority: {task.priority} Description: {task.description} Input Data: {json.dumps(task.input_data, indent=2)} Please process this task according to your role and capabilities. Provide a detailed response with actionable results. """ return prompt async def coordinate_multi_agent_task(self, user_message: str, user_context: Dict[str, Any] = None) -> Dict[str, Any]: """ Main coordination method for multi-agent tasks This is the entry point for complex multi-agent workflows """ start_time = time.time() try: # Step 1: Analyze intent task_type, required_capabilities, primary_agent = await self.analyze_intent(user_message) logger.info(f"🎯 Intent Analysis: {task_type} → {primary_agent} (capabilities: {required_capabilities})") # Step 2: Determine if multi-agent coordination is needed if task_type == "multi_agent_task" or len(required_capabilities) > 1: return await self._handle_multi_agent_workflow(user_message, required_capabilities, user_context) else: return await self._handle_single_agent_task(user_message, primary_agent, user_context) except Exception as e: logger.error(f"❌ Coordination error: {e}") return { "success": False, "error": str(e), "execution_time": time.time() - start_time } async def _handle_single_agent_task(self, message: str, agent_id: str, context: Dict[str, Any]) -> Dict[str, Any]: """Handle simple single-agent task""" start_time = time.time() try: # Create and execute task task_id = await self.create_task( task_type="single_agent", description=message, input_data={"message": message, "context": context or {}} ) await self.delegate_task(task_id, agent_id) result = await self.execute_task(task_id) return { "success": result.status == TaskStatus.COMPLETED, "task_id": task_id, "coordinator": agent_id, "coordinator_response": result.result.get("response", "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert."), "specialist_response": result.result.get("response", ""), "execution_time": time.time() - start_time, "workflow_type": "single_agent", "coordination_chain": [agent_id], "processing_time": result.execution_time, "timestamp": result.timestamp.isoformat() } except Exception as e: logger.error(f"❌ Single agent task failed: {e}") return { "success": False, "coordinator": agent_id, "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", "specialist_response": "", "error": str(e), "execution_time": time.time() - start_time, "workflow_type": "single_agent", "coordination_chain": [agent_id], "processing_time": 0, "timestamp": datetime.now().isoformat() } async def _handle_multi_agent_workflow(self, message: str, capabilities: List[str], context: Dict[str, Any]) -> Dict[str, Any]: """Handle complex multi-agent workflow with Jane as coordinator""" start_time = time.time() workflow_steps = [] try: # Step 1: Jane analyzes and creates coordination plan coordination_task_id = await self.create_task( task_type="coordination_analysis", description=f"Analyze the following request and create a multi-agent coordination plan: {message}", input_data={ "original_message": message, "detected_capabilities": capabilities, "context": context or {} }, priority=TaskPriority.HIGH ) await self.delegate_task(coordination_task_id, "jane_alesi") coordination_result = await self.execute_task(coordination_task_id) workflow_steps.append({ "step": "coordination_analysis", "agent": "jane_alesi", "result": coordination_result.result, "execution_time": coordination_result.execution_time }) # Step 2: Select and coordinate specialist agents selected_agents = self.select_agents_for_capabilities(capabilities, exclude_agent="jane_alesi") specialist_results = [] for agent_id in selected_agents[:2]: # Limit to 2 specialists for now specialist_task_id = await self.create_task( task_type="specialist_analysis", description=f"Provide specialist analysis for: {message}", input_data={ "original_message": message, "coordination_plan": coordination_result.result, "context": context or {} }, parent_task_id=coordination_task_id ) await self.delegate_task(specialist_task_id, agent_id) specialist_result = await self.execute_task(specialist_task_id) specialist_results.append(specialist_result) workflow_steps.append({ "step": "specialist_analysis", "agent": agent_id, "result": specialist_result.result, "execution_time": specialist_result.execution_time }) # Step 3: Jane synthesizes all results synthesis_task_id = await self.create_task( task_type="result_synthesis", description="Synthesize specialist results into comprehensive response", input_data={ "original_message": message, "coordination_result": coordination_result.result, "specialist_results": [r.result for r in specialist_results], "context": context or {} }, priority=TaskPriority.HIGH, parent_task_id=coordination_task_id ) await self.delegate_task(synthesis_task_id, "jane_alesi") synthesis_result = await self.execute_task(synthesis_task_id) workflow_steps.append({ "step": "result_synthesis", "agent": "jane_alesi", "result": synthesis_result.result, "execution_time": synthesis_result.execution_time }) total_execution_time = time.time() - start_time return { "success": True, "workflow_type": "multi_agent", "coordinator": "jane_alesi", "specialists": selected_agents[:2], "workflow_steps": workflow_steps, "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", "specialist_response": synthesis_result.result.get("response", ""), "final_response": synthesis_result.result.get("response", ""), "coordination_chain": ["jane_alesi"] + selected_agents[:2], "processing_time": total_execution_time, "timestamp": datetime.now().isoformat(), "task_count": len(workflow_steps) } except Exception as e: logger.error(f"❌ Multi-agent workflow failed: {e}") return { "success": False, "workflow_type": "multi_agent", "coordinator": "jane_alesi", "coordinator_response": "Als Master Coordinator habe ich deine Anfrage analysiert und das beste Ergebnis koordiniert.", "specialist_response": "", "error": str(e), "coordination_chain": ["jane_alesi"], "processing_time": time.time() - start_time, "timestamp": datetime.now().isoformat() } async def get_agent_workload(self, agent_id: str) -> Dict[str, Any]: """Get current workload statistics for an agent""" active_count = sum(1 for task in self.active_tasks.values() if task.assigned_agent == agent_id) completed_count = sum(1 for result in self.completed_tasks.values() if result.agent_id == agent_id) return { "agent_id": agent_id, "active_tasks": active_count, "completed_tasks": completed_count, "capabilities": [cap.name for cap in self.agent_capabilities.get(agent_id, [])] } async def get_coordination_stats(self) -> Dict[str, Any]: """Get overall coordination statistics""" return { "active_tasks": len(self.active_tasks), "completed_tasks": len(self.completed_tasks), "available_agents": len(self.agent_capabilities), "agent_workloads": { agent_id: await self.get_agent_workload(agent_id) for agent_id in self.agent_capabilities.keys() } } async def cleanup(self): """Cleanup Redis connections""" if self.redis_client: await self.redis_client.close() # Global coordinator instance coordinator_instance = None async def get_coordinator() -> MultiAgentCoordinator: """Get global coordinator instance""" global coordinator_instance if coordinator_instance is None: coordinator_instance = MultiAgentCoordinator() await coordinator_instance.initialize() return coordinator_instance