Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Multi-Agent Communication API Endpoints for SAAP Platform | |
| Provides REST API interface for multi-agent coordination and task delegation | |
| """ | |
| from fastapi import APIRouter, HTTPException, Depends | |
| from typing import Dict, Any, Optional, List | |
| import logging | |
| from datetime import datetime | |
| from pydantic import BaseModel | |
| from services.multi_agent_coordinator import MultiAgentCoordinator, TaskPriority, get_coordinator | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Create router for multi-agent endpoints | |
| multi_agent_router = APIRouter(prefix="/api/v1/multi-agent", tags=["Multi-Agent Communication"]) | |
| # 🔒 PRIVACY DETECTION HELPER | |
| async def _determine_provider(user_message: str, provider_preference: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| 🔒 Autonomous Privacy-First Provider Selection | |
| Analyzes user message for sensitive data and automatically selects appropriate provider: | |
| - Colossus (internal): For sensitive/personal data (GDPR-protected) | |
| - OpenRouter (external): For general queries (faster, cost-efficient) | |
| Detection Categories: | |
| - Medical: Patient data, diagnoses, symptoms, medications | |
| - Financial: Account numbers, IBANs, credit cards, financial details | |
| - Personal: Birthdates, addresses, phone numbers, ID numbers | |
| - Legal: Legal cases, confidential information | |
| Returns: | |
| Dict with 'provider', 'reason', 'detected_categories', 'confidence' | |
| """ | |
| # Force provider if explicitly requested | |
| if provider_preference and provider_preference != "auto": | |
| logger.info(f"🔒 Provider forced by user: {provider_preference}") | |
| return { | |
| "provider": provider_preference, | |
| "reason": "User preference", | |
| "selected_provider": provider_preference, | |
| "detected_categories": [], | |
| "confidence": "explicit", | |
| "auto_detected": False | |
| } | |
| # Sensitive keyword detection | |
| message_lower = user_message.lower() | |
| detected_categories = [] | |
| # Medical keywords (German + English) | |
| medical_keywords = [ | |
| 'patient', 'diagnose', 'diagnosis', 'krankheit', 'disease', 'symptom', | |
| 'arzt', 'doctor', 'medikament', 'medication', 'therapie', 'therapy', | |
| 'behandlung', 'treatment', 'gesundheit', 'health', 'krankenhaus', 'hospital', | |
| 'medizin', 'medicine', 'blut', 'blood', 'labor', 'test' | |
| ] | |
| # Financial keywords | |
| financial_keywords = [ | |
| 'konto', 'account', 'iban', 'bic', 'kreditkarte', 'credit card', | |
| 'gehalt', 'salary', 'steuer', 'tax', 'bank', 'überweisung', 'transfer', | |
| 'rechnung', 'invoice', 'zahlung', 'payment', 'finanz', 'financial' | |
| ] | |
| # Personal data keywords | |
| personal_keywords = [ | |
| 'geburtsdatum', 'birthdate', 'adresse', 'address', 'telefon', 'phone', | |
| 'ausweis', 'id', 'pass', 'passport', 'sozialversicherung', 'social security', | |
| 'privat', 'private', 'persönlich', 'personal' | |
| ] | |
| # Legal keywords | |
| legal_keywords = [ | |
| 'vertrag', 'contract', 'klage', 'lawsuit', 'anwalt', 'lawyer', | |
| 'gericht', 'court', 'urteil', 'verdict', 'rechtlich', 'legal', | |
| 'compliance', 'datenschutz', 'gdpr', 'dsgvo' | |
| ] | |
| # Check for sensitive keywords | |
| if any(keyword in message_lower for keyword in medical_keywords): | |
| detected_categories.append('medical') | |
| if any(keyword in message_lower for keyword in financial_keywords): | |
| detected_categories.append('financial') | |
| if any(keyword in message_lower for keyword in personal_keywords): | |
| detected_categories.append('personal') | |
| if any(keyword in message_lower for keyword in legal_keywords): | |
| detected_categories.append('legal') | |
| # Pattern-based detection (PII) | |
| import re | |
| # Email pattern | |
| if re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', user_message): | |
| detected_categories.append('email') | |
| # Phone number pattern (German + International) | |
| if re.search(r'\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b', user_message): | |
| detected_categories.append('phone') | |
| # IBAN pattern | |
| if re.search(r'\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b', user_message): | |
| detected_categories.append('iban') | |
| # Credit card pattern | |
| if re.search(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', user_message): | |
| detected_categories.append('credit_card') | |
| # Decision logic | |
| if detected_categories: | |
| # Sensitive data detected → Force Colossus | |
| logger.warning(f"🔒 SENSITIVE DATA DETECTED: {detected_categories} → Colossus enforced") | |
| return { | |
| "provider": "colossus", | |
| "reason": f"Sensitive data protection ({', '.join(detected_categories)})", | |
| "selected_provider": "colossus", | |
| "detected_categories": detected_categories, | |
| "confidence": "high", | |
| "auto_detected": True, | |
| "privacy_level": "high" | |
| } | |
| else: | |
| # No sensitive data → OpenRouter for speed/efficiency | |
| logger.info("🌐 No sensitive data → OpenRouter selected") | |
| return { | |
| "provider": "openrouter", | |
| "reason": "General query - optimized for speed", | |
| "selected_provider": "openrouter", | |
| "detected_categories": [], | |
| "confidence": "high", | |
| "auto_detected": True, | |
| "privacy_level": "standard" | |
| } | |
| class MultiAgentChatRequest(BaseModel): | |
| user_message: str | |
| user_context: Optional[Dict[str, Any]] = None | |
| preferred_agent: Optional[str] = None | |
| task_priority: TaskPriority = TaskPriority.NORMAL | |
| provider: Optional[str] = None # "auto", "colossus", "openrouter" | |
| privacy_mode: Optional[str] = None # Alias for provider | |
| class MultiAgentChatResponse(BaseModel): | |
| success: bool | |
| coordinator_response: str | |
| delegated_agent: Optional[str] = None | |
| specialist_response: Optional[str] = None | |
| coordination_chain: List[str] = [] | |
| processing_time: float = 0.0 | |
| workflow_type: str = "single_agent" | |
| task_id: Optional[str] = None | |
| cost_info: Optional[Dict[str, Any]] = None | |
| privacy_protection: Optional[Dict[str, Any]] = None # Privacy info | |
| error: Optional[str] = None | |
| async def multi_agent_chat( | |
| request: MultiAgentChatRequest, | |
| coordinator: MultiAgentCoordinator = Depends(get_coordinator) | |
| ): | |
| """ | |
| 🤖 Multi-Agent Chat Endpoint - Jane Alesi Master Coordinator | |
| Automatically analyzes user intent and either: | |
| 1. Handles request directly (Jane as coordinator) | |
| 2. Delegates to appropriate specialist agent | |
| 3. Orchestrates multi-agent workflow for complex tasks | |
| Examples: | |
| - "Entwickle eine Python App" → Jane delegates to John Alesi (Development) | |
| - "Medizinische Beratung für Diabetes" → Jane delegates to Lara Alesi (Medical) | |
| - "Legal Compliance Check" → Jane delegates to Justus Alesi (Legal) | |
| - "SAAP Platform Status" → Jane handles directly as Coordinator | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| logger.info(f"🤖 Multi-Agent Chat Request: {request.user_message[:100]}...") | |
| # 🔒 PRIVACY-FIRST: Determine provider based on sensitivity | |
| selected_provider = await _determine_provider( | |
| user_message=request.user_message, | |
| provider_preference=request.provider or request.privacy_mode | |
| ) | |
| logger.info(f"🔒 Provider selection: {selected_provider['provider']} (reason: {selected_provider['reason']})") | |
| # Execute multi-agent coordination with provider selection | |
| coordination_result = await coordinator.coordinate_multi_agent_task( | |
| user_message=request.user_message, | |
| user_context=request.user_context or {}, | |
| provider=selected_provider['provider'] # Pass provider to coordinator | |
| ) | |
| # Add privacy protection info to result | |
| coordination_result['privacy_protection'] = selected_provider | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| if coordination_result.get("success", False): | |
| # Successful coordination | |
| workflow_type = coordination_result.get("workflow_type", "single_agent") | |
| if workflow_type == "multi_agent": | |
| # Complex multi-agent workflow | |
| specialists = coordination_result.get("specialists", []) | |
| workflow_steps = coordination_result.get("workflow_steps", []) | |
| # Build coordination chain | |
| coordination_chain = ["jane_alesi"] # Jane always starts | |
| coordination_chain.extend(specialists) | |
| # Get final response from synthesis step | |
| coordinator_response = coordination_result.get("final_response", "Multi-agent workflow completed successfully.") | |
| # Get specialist response (first specialist for simplicity) | |
| specialist_response = None | |
| delegated_agent = None | |
| if workflow_steps: | |
| for step in workflow_steps: | |
| if step.get("step") == "specialist_analysis": | |
| delegated_agent = step.get("agent") | |
| specialist_response = step.get("result", {}).get("response", "Specialist analysis completed.") | |
| break | |
| logger.info(f"✅ Multi-Agent Workflow: {len(workflow_steps)} steps, {len(specialists)} specialists") | |
| return MultiAgentChatResponse( | |
| success=True, | |
| coordinator_response=coordinator_response, | |
| delegated_agent=delegated_agent, | |
| specialist_response=specialist_response, | |
| coordination_chain=coordination_chain, | |
| processing_time=processing_time, | |
| workflow_type="multi_agent", | |
| task_id=coordination_result.get("task_id"), | |
| cost_info={ | |
| "total_cost": 0.0, # Multi-agent coordination is free | |
| "task_count": coordination_result.get("task_count", 1), | |
| "agents_involved": len(coordination_chain) | |
| }, | |
| privacy_protection=coordination_result.get('privacy_protection') | |
| ) | |
| else: | |
| # Single agent delegation | |
| primary_agent = coordination_result.get("primary_agent", "jane_alesi") | |
| response_text = coordination_result.get("response", "Task completed successfully.") | |
| # Determine coordination chain | |
| coordination_chain = ["jane_alesi"] # Jane analyzes intent | |
| if primary_agent != "jane_alesi": | |
| coordination_chain.append(primary_agent) # Delegate to specialist | |
| coordination_chain.append("jane_alesi") # Jane provides final coordination | |
| logger.info(f"✅ Single Agent Delegation: jane_alesi → {primary_agent}") | |
| return MultiAgentChatResponse( | |
| success=True, | |
| coordinator_response=f"Als Master Coordinatorin habe ich deinen Request analysiert und {'direkt bearbeitet' if primary_agent == 'jane_alesi' else f'an {primary_agent} delegiert'}.", | |
| delegated_agent=primary_agent if primary_agent != "jane_alesi" else None, | |
| specialist_response=response_text if primary_agent != "jane_alesi" else None, | |
| coordination_chain=coordination_chain, | |
| processing_time=processing_time, | |
| workflow_type="single_agent", | |
| task_id=coordination_result.get("task_id"), | |
| cost_info={ | |
| "total_cost": 0.0, | |
| "agents_involved": len(coordination_chain) | |
| }, | |
| privacy_protection=coordination_result.get('privacy_protection') | |
| ) | |
| else: | |
| # Coordination failed | |
| error_msg = coordination_result.get("error", "Unknown coordination error") | |
| logger.error(f"❌ Multi-Agent Coordination failed: {error_msg}") | |
| return MultiAgentChatResponse( | |
| success=False, | |
| coordinator_response="Als Master Coordinatorin konnte ich deinen Request leider nicht erfolgreich bearbeiten.", | |
| processing_time=processing_time, | |
| error=error_msg | |
| ) | |
| except Exception as e: | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| logger.error(f"❌ Multi-Agent Chat API Error: {e}") | |
| return MultiAgentChatResponse( | |
| success=False, | |
| coordinator_response="Entschuldigung, es ist ein technischer Fehler im Multi-Agent System aufgetreten.", | |
| processing_time=processing_time, | |
| error=str(e) | |
| ) | |
| async def get_multi_agent_status( | |
| coordinator: MultiAgentCoordinator = Depends(get_coordinator) | |
| ): | |
| """ | |
| Get current multi-agent coordination status and statistics | |
| """ | |
| try: | |
| stats = await coordinator.get_coordination_stats() | |
| return { | |
| "status": "active", | |
| "coordinator": "jane_alesi", | |
| "available_specialists": [ | |
| {"id": "john_alesi", "name": "John Alesi", "specialization": "Development"}, | |
| {"id": "lara_alesi", "name": "Lara Alesi", "specialization": "Medical"}, | |
| {"id": "justus_alesi", "name": "Justus Alesi", "specialization": "Legal"}, | |
| {"id": "theo_alesi", "name": "Theo Alesi", "specialization": "Finance"}, | |
| {"id": "leon_alesi", "name": "Leon Alesi", "specialization": "System"}, | |
| {"id": "luna_alesi", "name": "Luna Alesi", "specialization": "Coaching"} | |
| ], | |
| "coordination_stats": stats, | |
| "features": { | |
| "intent_analysis": True, | |
| "automatic_delegation": True, | |
| "multi_agent_workflows": True, | |
| "real_time_coordination": True, | |
| "task_orchestration": True | |
| }, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Multi-Agent Status Error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Status check failed: {str(e)}") | |
| async def get_agent_capabilities( | |
| coordinator: MultiAgentCoordinator = Depends(get_coordinator) | |
| ): | |
| """ | |
| Get detailed agent capabilities for intelligent task delegation | |
| """ | |
| try: | |
| capabilities = {} | |
| for agent_id, agent_caps in coordinator.agent_capabilities.items(): | |
| capabilities[agent_id] = { | |
| "agent_name": { | |
| "jane_alesi": "Jane Alesi - Master Coordinator", | |
| "john_alesi": "John Alesi - Software Developer", | |
| "lara_alesi": "Lara Alesi - Medical Expert", | |
| "justus_alesi": "Justus Alesi - Legal Expert", | |
| "theo_alesi": "Theo Alesi - Financial Analyst", | |
| "leon_alesi": "Leon Alesi - System Administrator", | |
| "luna_alesi": "Luna Alesi - Coaching Specialist" | |
| }.get(agent_id, agent_id), | |
| "capabilities": [ | |
| { | |
| "name": cap.name, | |
| "description": cap.description, | |
| "keywords": cap.keywords, | |
| "complexity_level": cap.complexity_level | |
| } | |
| for cap in agent_caps | |
| ], | |
| "specialization": { | |
| "jane_alesi": "Coordination & Architecture", | |
| "john_alesi": "Software Development", | |
| "lara_alesi": "Medical Analysis", | |
| "justus_alesi": "Legal Compliance", | |
| "theo_alesi": "Financial Analysis", | |
| "leon_alesi": "System Administration", | |
| "luna_alesi": "Coaching & Process" | |
| }.get(agent_id, "General") | |
| } | |
| return { | |
| "total_agents": len(capabilities), | |
| "coordinator": "jane_alesi", | |
| "specialists_count": len(capabilities) - 1, | |
| "capabilities": capabilities, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Agent Capabilities Error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Capabilities retrieval failed: {str(e)}") | |
| async def get_agent_workload( | |
| agent_id: str, | |
| coordinator: MultiAgentCoordinator = Depends(get_coordinator) | |
| ): | |
| """ | |
| Get current workload and task statistics for a specific agent | |
| """ | |
| try: | |
| workload = await coordinator.get_agent_workload(agent_id) | |
| return workload | |
| except Exception as e: | |
| logger.error(f"❌ Agent Workload Error for {agent_id}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Workload check failed: {str(e)}") | |