Spaces:
Sleeping
Sleeping
| """ SAAP Agent Manager Service - LLMModelConfig.get() Error Resolution | |
| Database-integrated agent lifecycle management with colossus integration | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| import uuid | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, update, delete | |
| from models.agent_schema import SaapAgent, AgentStatus, AgentType, AgentTemplates | |
| from database.connection import db_manager | |
| from database.models import DBAgent, DBChatMessage, DBAgentSession | |
| from api.colossus_client import ColossusClient | |
| from agents.openrouter_saap_agent import OpenRouterSAAPAgent | |
| logger = logging.getLogger(__name__) | |
| class AgentManagerService: | |
| """ | |
| π§ FIXED: Production-ready Agent Manager with LLM config error resolution | |
| Features: | |
| - Database-backed agent storage and lifecycle | |
| - Real-time agent status management | |
| - colossus LLM integration with OpenRouter fallback | |
| - Session tracking and performance metrics | |
| - Health monitoring and error handling | |
| - Multi-provider chat support (colossus + OpenRouter) | |
| - β Robust LLM config access preventing AttributeError | |
| """ | |
| def __init__(self): | |
| self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access | |
| self.active_sessions: Dict[str, DBAgentSession] = {} | |
| self.colossus_client: Optional[ColossusClient] = None | |
| self.is_initialized = False | |
| self.colossus_connection_status = "unknown" | |
| self.last_colossus_test = None | |
| def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None): | |
| """ | |
| π§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors | |
| This is the same fix applied to HybridAgentManagerService but now in the base class. | |
| Handles dictionary, object, and Pydantic model configurations robustly. | |
| Resolves: 'LLMModelConfig' object has no attribute 'get' | |
| """ | |
| try: | |
| if not hasattr(agent, 'llm_config') or not agent.llm_config: | |
| logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}") | |
| return default | |
| llm_config = agent.llm_config | |
| # Case 1: Dictionary-based config (Frontend JSON) | |
| if isinstance(llm_config, dict): | |
| value = llm_config.get(key, default) | |
| logger.debug(f"β Dict config access: {key}={value}") | |
| return value | |
| # Case 2: Object with direct attribute access (Pydantic models) | |
| elif hasattr(llm_config, key): | |
| value = getattr(llm_config, key, default) | |
| logger.debug(f"β Attribute access: {key}={value}") | |
| return value | |
| # Case 3: Object with get() method (dict-like objects or fixed Pydantic) | |
| elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')): | |
| try: | |
| value = llm_config.get(key, default) | |
| logger.debug(f"β Method get() access: {key}={value}") | |
| return value | |
| except Exception as get_error: | |
| logger.warning(f"β οΈ get() method failed: {get_error}, trying fallback") | |
| # Case 4: Convert object to dict (Pydantic β dict) | |
| elif hasattr(llm_config, '__dict__'): | |
| config_dict = llm_config.__dict__ | |
| if key in config_dict: | |
| value = config_dict[key] | |
| logger.debug(f"β __dict__ access: {key}={value}") | |
| return value | |
| # Case 5: Try model_dump() for Pydantic v2 | |
| elif hasattr(llm_config, 'model_dump'): | |
| try: | |
| config_dict = llm_config.model_dump() | |
| value = config_dict.get(key, default) | |
| logger.debug(f"β model_dump() access: {key}={value}") | |
| return value | |
| except Exception: | |
| pass | |
| # Case 6: Try dict() conversion | |
| elif hasattr(llm_config, 'dict'): | |
| try: | |
| config_dict = llm_config.dict() | |
| value = config_dict.get(key, default) | |
| logger.debug(f"β dict() access: {key}={value}") | |
| return value | |
| except Exception: | |
| pass | |
| # Final fallback | |
| logger.warning(f"β οΈ Unknown config type {type(llm_config)} for {key}, using default: {default}") | |
| return default | |
| except AttributeError as e: | |
| logger.warning(f"β οΈ AttributeError in LLM config access for {key}: {e}, using default: {default}") | |
| return default | |
| except Exception as e: | |
| logger.error(f"β Unexpected error in LLM config access for {key}: {e}, using default: {default}") | |
| return default | |
| async def initialize(self): | |
| """Initialize agent manager with database and colossus connection""" | |
| try: | |
| logger.info("π Initializing Agent Manager Service...") | |
| # Initialize colossus client with better error handling | |
| try: | |
| logger.info("π Connecting to colossus server...") | |
| self.colossus_client = ColossusClient() | |
| await self.colossus_client.__aenter__() | |
| # Test colossus connection | |
| await self._test_colossus_connection() | |
| except Exception as colossus_error: | |
| logger.error(f"β colossus connection failed: {colossus_error}") | |
| self.colossus_connection_status = f"failed: {str(colossus_error)}" | |
| # Continue initialization without colossus (graceful degradation) | |
| # π§ NEW LOGIC: Load DB agents AND ensure 7 base templates exist | |
| await self._load_agents_from_database() | |
| # Check which base templates are missing | |
| base_template_ids = ['jane_alesi', 'john_alesi', 'lara_alesi', 'theo_alesi', 'justus_alesi', 'leon_alesi', 'luna_alesi'] | |
| missing_templates = [tid for tid in base_template_ids if tid not in self.agents] | |
| if missing_templates: | |
| logger.info(f"π¦ Loading missing base templates: {missing_templates}") | |
| await self._load_missing_templates(missing_templates) | |
| self.is_initialized = True | |
| logger.info(f"β Agent Manager initialized: {len(self.agents)} agents loaded") | |
| logger.info(f" Base templates: {len([a for a in self.agents if a in base_template_ids])}/7") | |
| logger.info(f" Custom agents: {len([a for a in self.agents if a not in base_template_ids])}") | |
| logger.info(f"π colossus status: {self.colossus_connection_status}") | |
| except Exception as e: | |
| logger.error(f"β Agent Manager initialization failed: {e}") | |
| raise | |
| async def _load_missing_templates(self, template_ids: List[str]): | |
| """Load specific missing base templates""" | |
| template_map = { | |
| 'jane_alesi': AgentTemplates.jane_alesi, | |
| 'john_alesi': AgentTemplates.john_alesi, | |
| 'lara_alesi': AgentTemplates.lara_alesi, | |
| 'theo_alesi': AgentTemplates.theo_alesi, | |
| 'justus_alesi': AgentTemplates.justus_alesi, | |
| 'leon_alesi': AgentTemplates.leon_alesi, | |
| 'luna_alesi': AgentTemplates.luna_alesi | |
| } | |
| for template_id in template_ids: | |
| try: | |
| template_method = template_map.get(template_id) | |
| if template_method: | |
| agent = template_method() | |
| await self.register_agent(agent) | |
| logger.info(f"β Loaded missing template: {agent.name}") | |
| except Exception as e: | |
| logger.error(f"β Failed to load template {template_id}: {e}") | |
| async def _test_colossus_connection(self): | |
| """Test colossus connection and update status""" | |
| try: | |
| if not self.colossus_client: | |
| self.colossus_connection_status = "client_not_initialized" | |
| return | |
| # Send a simple test message | |
| test_messages = [ | |
| {"role": "system", "content": "You are a test assistant."}, | |
| {"role": "user", "content": "Reply with just 'OK' to confirm connection."} | |
| ] | |
| logger.info("π§ͺ Testing colossus connection...") | |
| response = await self.colossus_client.chat_completion( | |
| messages=test_messages, | |
| agent_id="connection_test", | |
| max_tokens=10 | |
| ) | |
| if response and response.get("success"): | |
| self.colossus_connection_status = "connected" | |
| self.last_colossus_test = datetime.utcnow() | |
| logger.info("β colossus connection test successful") | |
| else: | |
| error_msg = response.get("error", "unknown error") if response else "no response" | |
| self.colossus_connection_status = f"test_failed: {error_msg}" | |
| logger.error(f"β colossus connection test failed: {error_msg}") | |
| except Exception as e: | |
| self.colossus_connection_status = f"test_error: {str(e)}" | |
| logger.error(f"β colossus connection test error: {e}") | |
| async def _load_agents_from_database(self): | |
| """Load all agents from database into memory cache with error recovery""" | |
| try: | |
| # Check if database manager is ready | |
| if not db_manager.is_initialized: | |
| logger.warning("β οΈ Database not yet initialized - will load default agents") | |
| return | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| db_agents = result.scalars().all() | |
| loaded_count = 0 | |
| failed_count = 0 | |
| for db_agent in db_agents: | |
| try: | |
| saap_agent = db_agent.to_saap_agent() | |
| self.agents[saap_agent.id] = saap_agent | |
| loaded_count += 1 | |
| logger.info(f"β Loaded: {saap_agent.name} ({saap_agent.id})") | |
| except Exception as agent_error: | |
| failed_count += 1 | |
| logger.error(f"β Failed to load agent {db_agent.id}: {agent_error}") | |
| continue | |
| logger.info(f"π Loaded {loaded_count} agents from database ({failed_count} failed)") | |
| except Exception as e: | |
| logger.error(f"β Failed to load agents from database: {e}") | |
| logger.info("π¦ Will proceed with in-memory agents only") | |
| async def load_default_agents(self): | |
| """π€ Load ALL default Alesi agents with improved error handling""" | |
| try: | |
| logger.info("π€ Loading ALL default Alesi agents...") | |
| # π§ FIX: Load templates with individual error handling | |
| template_methods = [ | |
| ('jane_alesi', 'Jane Alesi - Coordinator'), | |
| ('john_alesi', 'John Alesi - Developer'), | |
| ('lara_alesi', 'Lara Alesi - Medical Specialist'), | |
| ('theo_alesi', 'Theo Alesi - Financial Specialist'), | |
| ('justus_alesi', 'Justus Alesi - Legal Specialist'), | |
| ('leon_alesi', 'Leon Alesi - System Specialist'), | |
| ('luna_alesi', 'Luna Alesi - Coaching Specialist') | |
| ] | |
| loaded_agents = [] | |
| for method_name, display_name in template_methods: | |
| try: | |
| # Get template method | |
| template_method = getattr(AgentTemplates, method_name, None) | |
| if template_method is None: | |
| logger.error(f"β Template method not found: AgentTemplates.{method_name}") | |
| continue | |
| # Create agent instance | |
| agent = template_method() | |
| # Register agent | |
| success = await self.register_agent(agent) | |
| if success: | |
| loaded_agents.append(display_name) | |
| logger.info(f"β Loaded: {display_name}") | |
| else: | |
| logger.error(f"β Failed to register: {display_name}") | |
| except Exception as template_error: | |
| logger.error(f"β Error loading {display_name}: {template_error}") | |
| continue | |
| if loaded_agents: | |
| logger.info(f"β Successfully loaded agents: {loaded_agents}") | |
| else: | |
| logger.error("β No agents could be loaded!") | |
| except Exception as e: | |
| logger.error(f"β Agent loading failed: {e}") | |
| async def register_agent(self, agent: SaapAgent) -> bool: | |
| """Register new agent with database persistence""" | |
| try: | |
| # Always add to memory cache first | |
| self.agents[agent.id] = agent | |
| # Try to persist to database if available | |
| try: | |
| if db_manager.is_initialized: | |
| async with db_manager.get_async_session() as session: | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| await session.commit() | |
| logger.info(f"β Agent registered with database: {agent.name} ({agent.id})") | |
| else: | |
| logger.info(f"β Agent registered in-memory only: {agent.name} ({agent.id})") | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database persistence failed for {agent.name}: {db_error}") | |
| # But keep the agent in memory | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent registration failed: {e}") | |
| # Remove from cache if registration completely failed | |
| self.agents.pop(agent.id, None) | |
| return False | |
| def get_agent(self, agent_id: str) -> Optional[SaapAgent]: | |
| """Get agent from memory cache with debug info""" | |
| agent = self.agents.get(agent_id) | |
| if agent: | |
| logger.debug(f"π Agent found: {agent.name} ({agent_id}) - Status: {agent.status}") | |
| else: | |
| logger.warning(f"β Agent not found: {agent_id}") | |
| logger.debug(f"π Available agents: {list(self.agents.keys())}") | |
| return agent | |
| async def list_agents(self, status: Optional[AgentStatus] = None, | |
| agent_type: Optional[AgentType] = None) -> List[SaapAgent]: | |
| """List all agents with optional filtering""" | |
| agents = list(self.agents.values()) | |
| if status: | |
| agents = [a for a in agents if a.status == status] | |
| if agent_type: | |
| agents = [a for a in agents if a.type == agent_type] | |
| return agents | |
| async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]: | |
| """Get agent statistics""" | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return {} | |
| # Return basic stats from agent object | |
| return { | |
| "messages_processed": getattr(agent, 'messages_processed', 0), | |
| "total_tokens": getattr(agent, 'total_tokens', 0), | |
| "average_response_time": getattr(agent, 'avg_response_time', 0), | |
| "status": agent.status.value, | |
| "last_active": getattr(agent, 'last_active', None) | |
| } | |
| async def health_check(self, agent_id: str) -> Dict[str, Any]: | |
| """Perform agent health check""" | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return {"healthy": False, "checks": {"agent_exists": False}} | |
| return { | |
| "healthy": agent.status == AgentStatus.ACTIVE, | |
| "checks": { | |
| "agent_exists": True, | |
| "status": agent.status.value, | |
| "colossus_connection": self.colossus_connection_status == "connected" | |
| } | |
| } | |
| async def update_agent(self, agent_id: str, updated_data) -> SaapAgent: | |
| """Update agent configuration - accepts dict or SaapAgent with schema migration""" | |
| try: | |
| # Get current agent | |
| current_agent = self.get_agent(agent_id) | |
| if not current_agent: | |
| raise ValueError(f"Agent {agent_id} not found") | |
| # Convert dict to SaapAgent if needed | |
| if isinstance(updated_data, dict): | |
| # Get current data | |
| current_dict = current_agent.dict() | |
| # π§ FIX: Migrate old frontend schema to new schema | |
| # Handle top-level 'color' β 'appearance.color' | |
| if 'color' in updated_data and 'appearance' not in updated_data: | |
| if 'appearance' not in current_dict: | |
| current_dict['appearance'] = {} | |
| current_dict['appearance']['color'] = updated_data.pop('color') | |
| # Handle top-level 'avatar' β 'appearance.avatar' | |
| if 'avatar' in updated_data and 'appearance' not in updated_data: | |
| if 'appearance' not in current_dict: | |
| current_dict['appearance'] = {} | |
| current_dict['appearance']['avatar'] = updated_data.pop('avatar') | |
| # Merge updates | |
| for key, value in updated_data.items(): | |
| if key in current_dict: | |
| if isinstance(value, dict) and isinstance(current_dict[key], dict): | |
| # Merge nested dicts | |
| current_dict[key].update(value) | |
| else: | |
| current_dict[key] = value | |
| updated_agent = SaapAgent(**current_dict) | |
| elif isinstance(updated_data, SaapAgent): | |
| updated_agent = updated_data | |
| else: | |
| raise ValueError(f"Invalid update data type: {type(updated_data)}") | |
| # Update in memory cache | |
| self.agents[agent_id] = updated_agent | |
| # Try to update in database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| # Delete old and insert new (simpler than complex update) | |
| await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) | |
| db_agent = DBAgent.from_saap_agent(updated_agent) | |
| session.add(db_agent) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database update failed for {agent_id}: {db_error}") | |
| logger.info(f"β Agent updated: {agent_id}") | |
| return updated_agent | |
| except Exception as e: | |
| logger.error(f"β Agent update failed: {e}") | |
| raise | |
| async def delete_agent(self, agent_id: str) -> bool: | |
| """Delete agent from memory and database""" | |
| try: | |
| # Stop agent if running | |
| await self.stop_agent(agent_id) | |
| # Remove from memory | |
| self.agents.pop(agent_id, None) | |
| # Try to remove from database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database deletion failed for {agent_id}: {db_error}") | |
| logger.info(f"β Agent deleted: {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent deletion failed: {e}") | |
| return False | |
| async def start_agent(self, agent_id: str) -> bool: | |
| """Start agent and create session""" | |
| try: | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| logger.error(f"β Cannot start agent: {agent_id} not found") | |
| return False | |
| # Update status | |
| agent.status = AgentStatus.ACTIVE | |
| if hasattr(agent, 'metrics') and agent.metrics: | |
| agent.metrics.last_active = datetime.utcnow() | |
| # Try to create agent session in database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| db_session = DBAgentSession(agent_id=agent_id) | |
| session.add(db_session) | |
| await session.commit() | |
| await session.refresh(db_session) | |
| # Store in active sessions | |
| self.active_sessions[agent_id] = db_session | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database session creation failed for {agent_id}: {db_error}") | |
| # Update agent status in database if available | |
| await self._update_agent_status(agent_id, AgentStatus.ACTIVE) | |
| logger.info(f"β Agent started: {agent.name} ({agent_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent start failed: {e}") | |
| return False | |
| async def stop_agent(self, agent_id: str) -> bool: | |
| """Stop agent and close session""" | |
| try: | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return False | |
| # Update status | |
| agent.status = AgentStatus.INACTIVE | |
| # Close agent session if exists | |
| if agent_id in self.active_sessions: | |
| session_obj = self.active_sessions[agent_id] | |
| session_obj.session_end = datetime.utcnow() | |
| session_obj.status = "completed" | |
| session_obj.end_reason = "graceful" | |
| session_obj.calculate_duration() | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.merge(session_obj) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database session update failed for {agent_id}: {db_error}") | |
| del self.active_sessions[agent_id] | |
| # Update agent status in database if available | |
| await self._update_agent_status(agent_id, AgentStatus.INACTIVE) | |
| logger.info(f"π§ Agent stopped: {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent stop failed: {e}") | |
| return False | |
| async def restart_agent(self, agent_id: str) -> bool: | |
| """Restart agent (stop + start)""" | |
| try: | |
| await self.stop_agent(agent_id) | |
| await asyncio.sleep(1) # Brief pause | |
| return await self.start_agent(agent_id) | |
| except Exception as e: | |
| logger.error(f"β Agent restart failed: {e}") | |
| return False | |
| async def _update_agent_status(self, agent_id: str, status: AgentStatus): | |
| """Update agent status in database""" | |
| if not db_manager.is_initialized: | |
| return | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.execute( | |
| update(DBAgent) | |
| .where(DBAgent.id == agent_id) | |
| .values(status=status.value, last_active=datetime.utcnow()) | |
| ) | |
| await session.commit() | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to update agent status in database: {e}") | |
| # π NEW: Multi-Provider Chat Support | |
| async def send_message_to_agent(self, agent_id: str, message: str, | |
| provider: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Send message to agent via specified provider or auto-fallback | |
| Args: | |
| agent_id: Target agent ID | |
| message: Message content | |
| provider: Optional provider override ("colossus", "openrouter", or None for auto) | |
| Returns: | |
| Chat response with metadata | |
| """ | |
| try: | |
| # Enhanced error checking with detailed debugging | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| error_msg = f"Agent {agent_id} not found in loaded agents" | |
| logger.error(f"β {error_msg}") | |
| logger.debug(f"π Available agents: {list(self.agents.keys())}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "available_agents": list(self.agents.keys()), | |
| "agent_manager_initialized": self.is_initialized | |
| } | |
| } | |
| # Check if agent is available for messaging | |
| if agent.status != AgentStatus.ACTIVE: | |
| error_msg = f"Agent {agent_id} not available (status: {agent.status.value})" | |
| logger.error(f"β {error_msg}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "agent_status": agent.status.value, | |
| "agent_id": agent_id | |
| } | |
| } | |
| # π Multi-Provider Logic | |
| if provider == "openrouter": | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| elif provider == "colossus": | |
| return await self._send_via_colossus(agent_id, message, agent) | |
| else: | |
| # Auto-selection: Try colossus first, fallback to OpenRouter | |
| if self.colossus_connection_status == "connected": | |
| logger.info(f"π Using colossus as primary provider for {agent_id}") | |
| result = await self._send_via_colossus(agent_id, message, agent) | |
| # If colossus fails, try OpenRouter | |
| if "error" in result and "colossus" in result["error"].lower(): | |
| logger.info(f"π colossus failed, trying OpenRouter fallback...") | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| return result | |
| else: | |
| logger.info(f"π colossus unavailable, using OpenRouter as primary for {agent_id}") | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"β Message to agent failed: {error_msg}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "agent_id": agent_id, | |
| "provider": provider, | |
| "colossus_status": self.colossus_connection_status, | |
| "agent_found": agent_id in self.agents, | |
| "colossus_client_exists": self.colossus_client is not None | |
| } | |
| } | |
| async def _send_via_openrouter(self, agent_id: str, message: str, | |
| agent: SaapAgent) -> Dict[str, Any]: | |
| """Send message via OpenRouter provider""" | |
| try: | |
| logger.info(f"π {agent_id} (coordinator) initialized with OpenRouter FREE") | |
| # Create OpenRouter agent for this request | |
| openrouter_agent = OpenRouterSAAPAgent( | |
| agent_id, | |
| agent.type.value if agent.type else "Assistant", | |
| os.getenv("OPENROUTER_API_KEY") | |
| ) | |
| # Get cost-optimized model for specific agent | |
| model_map = { | |
| "jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"), | |
| "john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"), | |
| "lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku"), | |
| "theo_alesi": os.getenv("THEO_ALESI_MODEL", "openai/gpt-4o-mini"), # π° Financial | |
| "justus_alesi": os.getenv("JUSTUS_ALESI_MODEL", "anthropic/claude-3-haiku"), # βοΈ Legal | |
| "leon_alesi": os.getenv("LEON_ALESI_MODEL", "deepseek/deepseek-coder"), # π§ System | |
| "luna_alesi": os.getenv("LUNA_ALESI_MODEL", "openai/gpt-4o-mini") # π Coaching | |
| } | |
| preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free") | |
| openrouter_agent.model_name = preferred_model | |
| start_time = datetime.utcnow() | |
| logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...") | |
| # π§ FIXED: Use safe LLM config access | |
| max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) | |
| # Send message via OpenRouter | |
| response = await openrouter_agent.send_request_to_openrouter( | |
| message, | |
| max_tokens=max_tokens_value | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time = (end_time - start_time).total_seconds() | |
| if response.get("success"): | |
| logger.info(f"β OpenRouter response successful in {response_time:.2f}s") | |
| response_content = response.get("response", "") | |
| tokens_used = response.get("token_count", 0) | |
| cost_usd = response.get("cost_usd", 0.0) | |
| # Try to save to database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=message, | |
| agent_response=response_content, | |
| response_time=response_time, | |
| tokens_used=tokens_used, | |
| metadata={ | |
| "model": preferred_model, | |
| "provider": "OpenRouter", | |
| "cost_usd": cost_usd, | |
| "temperature": 0.7 | |
| } | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Failed to save OpenRouter chat to database: {db_error}") | |
| return { | |
| "content": response_content, | |
| "response_time": response_time, | |
| "tokens_used": tokens_used, | |
| "cost_usd": cost_usd, | |
| "provider": "OpenRouter", | |
| "model": preferred_model, | |
| "timestamp": end_time.isoformat() | |
| } | |
| else: | |
| error_msg = response.get("error", "Unknown OpenRouter error") | |
| logger.error(f"β OpenRouter fallback failed: {error_msg}") | |
| return { | |
| "error": f"OpenRouter error: {error_msg}", | |
| "provider": "OpenRouter", | |
| "timestamp": end_time.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β OpenRouter fallback failed: {str(e)}") | |
| return { | |
| "error": f"OpenRouter error: {str(e)}", | |
| "provider": "OpenRouter", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def _send_via_colossus(self, agent_id: str, message: str, | |
| agent: SaapAgent) -> Dict[str, Any]: | |
| """Send message via colossus provider""" | |
| try: | |
| # Check colossus client availability | |
| if not self.colossus_client: | |
| return { | |
| "error": "colossus client not initialized", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Test colossus connection if it's been a while | |
| if (not self.last_colossus_test or | |
| (datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes | |
| await self._test_colossus_connection() | |
| if self.colossus_connection_status != "connected": | |
| return { | |
| "error": f"colossus connection not healthy: {self.colossus_connection_status}", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| start_time = datetime.utcnow() | |
| logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via colossus...") | |
| # π§ FIXED: Use safe LLM config access | |
| temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7) | |
| max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) | |
| # Send message to colossus | |
| response = await self.colossus_client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": agent.description or f"You are {agent.name}"}, | |
| {"role": "user", "content": message} | |
| ], | |
| agent_id=agent_id, | |
| temperature=temperature_value, | |
| max_tokens=max_tokens_value | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time = (end_time - start_time).total_seconds() | |
| logger.info(f"π₯ Received response from colossus in {response_time:.2f}s") | |
| # Enhanced response parsing | |
| response_content = "" | |
| tokens_used = 0 | |
| if response: | |
| logger.debug(f"π Raw colossus response: {response}") | |
| if isinstance(response, dict): | |
| # SAAP ColossusClient format: {"success": true, "response": {...}} | |
| if response.get("success") and "response" in response: | |
| colossus_response = response["response"] | |
| if isinstance(colossus_response, dict) and "choices" in colossus_response: | |
| # OpenAI-compatible format within SAAP response | |
| if len(colossus_response["choices"]) > 0: | |
| choice = colossus_response["choices"][0] | |
| if "message" in choice and "content" in choice["message"]: | |
| response_content = choice["message"]["content"] | |
| elif isinstance(colossus_response, str): | |
| # Direct string response | |
| response_content = colossus_response | |
| # Extract token usage if available | |
| if isinstance(colossus_response, dict) and "usage" in colossus_response: | |
| tokens_used = colossus_response["usage"].get("total_tokens", 0) | |
| # Handle colossus client error responses | |
| elif not response.get("success"): | |
| error_msg = response.get("error", "Unknown colossus error") | |
| logger.error(f"β colossus error: {error_msg}") | |
| return { | |
| "error": f"colossus server error: {error_msg}", | |
| "provider": "colossus", | |
| "timestamp": end_time.isoformat() | |
| } | |
| # Direct OpenAI format: {"choices": [...]} | |
| elif "choices" in response and len(response["choices"]) > 0: | |
| choice = response["choices"][0] | |
| if "message" in choice and "content" in choice["message"]: | |
| response_content = choice["message"]["content"] | |
| if "usage" in response: | |
| tokens_used = response["usage"].get("total_tokens", 0) | |
| # Simple response format: {"response": "text"} or {"content": "text"} | |
| elif "response" in response: | |
| response_content = response["response"] | |
| elif "content" in response: | |
| response_content = response["content"] | |
| elif isinstance(response, str): | |
| # Direct string response | |
| response_content = response | |
| # Fallback if no content extracted | |
| if not response_content: | |
| logger.error(f"β Unable to extract content from colossus response: {response}") | |
| return { | |
| "error": "Failed to parse colossus response", | |
| "provider": "colossus", | |
| "timestamp": end_time.isoformat() | |
| } | |
| # Try to save to database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=message, | |
| agent_response=response_content, | |
| response_time=response_time, | |
| tokens_used=tokens_used, | |
| metadata={ | |
| "model": "mistral-small3.2:24b-instruct-2506", | |
| "provider": "colossus", | |
| "temperature": 0.7 | |
| } | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Failed to save chat message to database: {db_error}") | |
| # Update session metrics | |
| if agent_id in self.active_sessions: | |
| session_obj = self.active_sessions[agent_id] | |
| session_obj.messages_processed += 1 | |
| session_obj.total_tokens_used += tokens_used | |
| logger.info(f"β Message processed successfully for {agent.name}") | |
| return { | |
| "content": response_content, | |
| "response_time": response_time, | |
| "tokens_used": tokens_used, | |
| "provider": "colossus", | |
| "model": "mistral-small3.2:24b-instruct-2506", | |
| "timestamp": end_time.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β colossus communication failed: {str(e)}") | |
| return { | |
| "error": f"colossus error: {str(e)}", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]: | |
| """Get comprehensive agent metrics from database""" | |
| if not db_manager.is_initialized: | |
| return {"warning": "Database not available - no metrics"} | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| # Get message count and average response time | |
| result = await session.execute( | |
| select(DBChatMessage).where(DBChatMessage.agent_id == agent_id) | |
| ) | |
| messages = result.scalars().all() | |
| if messages: | |
| avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages) | |
| total_tokens = sum(m.tokens_used for m in messages if m.tokens_used) | |
| else: | |
| avg_response_time = 0 | |
| total_tokens = 0 | |
| # Get session count | |
| session_result = await session.execute( | |
| select(DBAgentSession).where(DBAgentSession.agent_id == agent_id) | |
| ) | |
| sessions = session_result.scalars().all() | |
| return { | |
| "total_messages": len(messages), | |
| "total_tokens_used": total_tokens, | |
| "average_response_time": avg_response_time, | |
| "total_sessions": len(sessions), | |
| "last_activity": max([s.session_start for s in sessions], default=None), | |
| } | |
| except Exception as e: | |
| logger.error(f"β Failed to get agent metrics: {e}") | |
| return {} | |
| async def get_system_status(self) -> Dict[str, Any]: | |
| """Get comprehensive system status for debugging""" | |
| return { | |
| "agent_manager_initialized": self.is_initialized, | |
| "colossus_connection_status": self.colossus_connection_status, | |
| "colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None, | |
| "loaded_agents": len(self.agents), | |
| "active_sessions": len(self.active_sessions), | |
| "agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value} | |
| for aid, agent in self.agents.items()], | |
| "database_initialized": getattr(db_manager, 'is_initialized', False) | |
| } | |
| async def shutdown_all_agents(self): | |
| """Gracefully shutdown all active agents""" | |
| try: | |
| logger.info("π§ Shutting down all agents...") | |
| for agent_id in list(self.agents.keys()): | |
| await self.stop_agent(agent_id) | |
| if self.colossus_client: | |
| await self.colossus_client.__aexit__(None, None, None) | |
| logger.info("β All agents shut down successfully") | |
| except Exception as e: | |
| logger.error(f"β Agent shutdown failed: {e}") | |
| # Create global instance for dependency injection | |
| agent_manager = AgentManagerService() | |
| # Make class available for import | |
| AgentManager = AgentManagerService | |
| if __name__ == "__main__": | |
| async def test_agent_manager(): | |
| """Test agent manager functionality""" | |
| manager = AgentManagerService() | |
| await manager.initialize() | |
| # List agents | |
| agents = list(manager.agents.values()) | |
| print(f"π Agents loaded: {[a.name for a in agents]}") | |
| # Start first agent | |
| if agents: | |
| agent = agents[0] | |
| success = await manager.start_agent(agent.id) | |
| print(f"π Start agent {agent.name}: {'β ' if success else 'β'}") | |
| await manager.shutdown_all_agents() | |
| asyncio.run(test_agent_manager()) | |