""" πŸ”§ CRITICAL FIX: Agent Settings/Update Problem - Data Type Mismatch Resolved Fixed both AgentManagerService.update_agent() method signature issues PROBLEM SOLVED: - ❌ 'dict' object has no attribute 'id' β†’ βœ… Proper data conversion - ❌ 'dict' object has no attribute 'name' β†’ βœ… SaapAgent object handling - ❌ Agent Settings Modal fails β†’ βœ… Frontend-Backend compatibility This fixes the Agent Settings/Update functionality completely. """ import asyncio import logging import os from typing import Dict, List, Optional, Any, Union from datetime import datetime import uuid from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, delete from models.agent import SaapAgent, AgentStatus, AgentType, AgentTemplates from database.connection import db_manager from database.models import DBAgent, DBChatMessage, DBAgentSession from api.colossus_client import ColossusClient from agents.openrouter_saap_agent import OpenRouterSAAPAgent logger = logging.getLogger(__name__) class AgentManagerService: """ πŸ”§ CRITICAL FIX: Production-ready Agent Manager with Agent Update Fix FIXED ISSUES: 1. βœ… update_agent() now accepts both Dict and SaapAgent objects 2. βœ… Proper data type conversion for Frontendβ†’Backend compatibility 3. βœ… Agent Settings Modal now works without AttributeError 4. βœ… Maintains backward compatibility with existing code Features: - Database-backed agent storage and lifecycle - Real-time agent status management - colossus LLM integration with OpenRouter fallback - Session tracking and performance metrics - Health monitoring and error handling - Multi-provider chat support (colossus + OpenRouter) - βœ… Robust LLM config access preventing AttributeError - βœ… Fixed Agent Update/Settings functionality """ def __init__(self): self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access self.active_sessions: Dict[str, DBAgentSession] = {} self.colossus_client: Optional[ColossusClient] = None self.is_initialized = False self.colossus_connection_status = "unknown" self.last_colossus_test = None def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None): """ πŸ”§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors This is the same fix applied to HybridAgentManagerService but now in the base class. Handles dictionary, object, and Pydantic model configurations robustly. Resolves: 'LLMModelConfig' object has no attribute 'get' """ try: if not hasattr(agent, 'llm_config') or not agent.llm_config: logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}") return default llm_config = agent.llm_config # Case 1: Dictionary-based config (Frontend JSON) if isinstance(llm_config, dict): value = llm_config.get(key, default) logger.debug(f"βœ… Dict config access: {key}={value}") return value # Case 2: Object with direct attribute access (Pydantic models) elif hasattr(llm_config, key): value = getattr(llm_config, key, default) logger.debug(f"βœ… Attribute access: {key}={value}") return value # Case 3: Object with get() method (dict-like objects or fixed Pydantic) elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')): try: value = llm_config.get(key, default) logger.debug(f"βœ… Method get() access: {key}={value}") return value except Exception as get_error: logger.warning(f"⚠️ get() method failed: {get_error}, trying fallback") # Case 4: Convert object to dict (Pydantic β†’ dict) elif hasattr(llm_config, '__dict__'): config_dict = llm_config.__dict__ if key in config_dict: value = config_dict[key] logger.debug(f"βœ… __dict__ access: {key}={value}") return value # Case 5: Try model_dump() for Pydantic v2 elif hasattr(llm_config, 'model_dump'): try: config_dict = llm_config.model_dump() value = config_dict.get(key, default) logger.debug(f"βœ… model_dump() access: {key}={value}") return value except Exception: pass # Case 6: Try dict() conversion elif hasattr(llm_config, 'dict'): try: config_dict = llm_config.dict() value = config_dict.get(key, default) logger.debug(f"βœ… dict() access: {key}={value}") return value except Exception: pass # Final fallback logger.warning(f"⚠️ Unknown config type {type(llm_config)} for {key}, using default: {default}") return default except AttributeError as e: logger.warning(f"⚠️ AttributeError in LLM config access for {key}: {e}, using default: {default}") return default except Exception as e: logger.error(f"❌ Unexpected error in LLM config access for {key}: {e}, using default: {default}") return default async def initialize(self): """Initialize agent manager with database and colossus connection""" try: logger.info("πŸš€ Initializing Agent Manager Service...") # Initialize colossus client with better error handling try: logger.info("πŸ”Œ Connecting to colossus server...") self.colossus_client = ColossusClient() await self.colossus_client.__aenter__() # Test colossus connection await self._test_colossus_connection() except Exception as colossus_error: logger.error(f"❌ colossus connection failed: {colossus_error}") self.colossus_connection_status = f"failed: {str(colossus_error)}" # Continue initialization without colossus (graceful degradation) # Try to load agents from database (graceful fallback if db not ready) await self._load_agents_from_database() # Load default agents if database is empty or not available if not self.agents: await self.load_default_agents() self.is_initialized = True logger.info(f"βœ… Agent Manager initialized: {len(self.agents)} agents loaded") logger.info(f"πŸ”Œ colossus status: {self.colossus_connection_status}") except Exception as e: logger.error(f"❌ Agent Manager initialization failed: {e}") raise async def _test_colossus_connection(self): """Test colossus connection and update status""" try: if not self.colossus_client: self.colossus_connection_status = "client_not_initialized" return # Send a simple test message test_messages = [ {"role": "system", "content": "You are a test assistant."}, {"role": "user", "content": "Reply with just 'OK' to confirm connection."} ] logger.info("πŸ§ͺ Testing colossus connection...") response = await self.colossus_client.chat_completion( messages=test_messages, agent_id="connection_test", max_tokens=10 ) if response and response.get("success"): self.colossus_connection_status = "connected" self.last_colossus_test = datetime.utcnow() logger.info("βœ… colossus connection test successful") else: error_msg = response.get("error", "unknown error") if response else "no response" self.colossus_connection_status = f"test_failed: {error_msg}" logger.error(f"❌ colossus connection test failed: {error_msg}") except Exception as e: self.colossus_connection_status = f"test_error: {str(e)}" logger.error(f"❌ colossus connection test error: {e}") async def _load_agents_from_database(self): """Load all agents from database into memory cache""" try: # Check if database manager is ready if not db_manager.is_initialized: logger.warning("⚠️ Database not yet initialized - will load default agents") return async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent)) db_agents = result.scalars().all() for db_agent in db_agents: saap_agent = db_agent.to_saap_agent() self.agents[saap_agent.id] = saap_agent logger.info(f"πŸ“š Loaded {len(db_agents)} agents from database") except Exception as e: logger.error(f"❌ Failed to load agents from database: {e}") # Don't raise - allow service to start with empty agent list logger.info("πŸ“¦ Will proceed with in-memory agents only") async def load_default_agents(self): """Load default Alesi agents (Jane, John, Lara)""" try: logger.info("πŸ€– Loading default Alesi agents...") default_agents = [ AgentTemplates.jane_alesi(), AgentTemplates.john_alesi(), AgentTemplates.lara_alesi() ] for agent in default_agents: await self.register_agent(agent) logger.info(f"βœ… Default agents loaded: {[a.name for a in default_agents]}") except Exception as e: logger.error(f"❌ Agent registration failed: {e}") async def register_agent(self, agent: SaapAgent) -> bool: """Register new agent with database persistence""" try: # Always add to memory cache first self.agents[agent.id] = agent # Try to persist to database if available try: if db_manager.is_initialized: async with db_manager.get_async_session() as session: db_agent = DBAgent.from_saap_agent(agent) session.add(db_agent) await session.commit() logger.info(f"βœ… Agent registered with database: {agent.name} ({agent.id})") else: logger.info(f"βœ… Agent registered in-memory only: {agent.name} ({agent.id})") except Exception as db_error: logger.warning(f"⚠️ Database persistence failed for {agent.name}: {db_error}") # But keep the agent in memory return True except Exception as e: logger.error(f"❌ Agent registration failed: {e}") # Remove from cache if registration completely failed self.agents.pop(agent.id, None) return False def get_agent(self, agent_id: str) -> Optional[SaapAgent]: """Get agent from memory cache with debug info""" agent = self.agents.get(agent_id) if agent: logger.debug(f"πŸ” Agent found: {agent.name} ({agent_id}) - Status: {agent.status}") else: logger.warning(f"❌ Agent not found: {agent_id}") logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}") return agent async def list_agents(self, status: Optional[AgentStatus] = None, agent_type: Optional[AgentType] = None) -> List[SaapAgent]: """List all agents with optional filtering""" agents = list(self.agents.values()) if status: agents = [a for a in agents if a.status == status] if agent_type: agents = [a for a in agents if a.type == agent_type] return agents async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]: """Get agent statistics""" agent = self.get_agent(agent_id) if not agent: return {} # Return basic stats from agent object return { "messages_processed": getattr(agent, 'messages_processed', 0), "total_tokens": getattr(agent, 'total_tokens', 0), "average_response_time": getattr(agent, 'avg_response_time', 0), "status": agent.status.value, "last_active": getattr(agent, 'last_active', None) } async def health_check(self, agent_id: str) -> Dict[str, Any]: """Perform agent health check""" agent = self.get_agent(agent_id) if not agent: return {"healthy": False, "checks": {"agent_exists": False}} return { "healthy": agent.status == AgentStatus.ACTIVE, "checks": { "agent_exists": True, "status": agent.status.value, "colossus_connection": self.colossus_connection_status == "connected" } } async def update_agent(self, agent_id: str, agent_data: Union[Dict[str, Any], SaapAgent]) -> bool: """ πŸ”§ CRITICAL FIX: Update agent configuration with proper data type handling FIXED PROBLEMS: - ❌ 'dict' object has no attribute 'id' β†’ βœ… Handles both Dict and SaapAgent - ❌ 'dict' object has no attribute 'name' β†’ βœ… Proper data conversion - ❌ Agent Settings Modal fails β†’ βœ… Frontend-Backend compatibility Args: agent_id: Agent ID to update agent_data: Either a dictionary (from Frontend) or SaapAgent object Returns: bool: Success status """ try: logger.info(f"πŸ”§ Updating agent {agent_id} with data type: {type(agent_data)}") # Get existing agent existing_agent = self.get_agent(agent_id) if not existing_agent: logger.error(f"❌ Cannot update: Agent {agent_id} not found") return False # Handle both Dict and SaapAgent input types if isinstance(agent_data, dict): logger.debug(f"πŸ“₯ Received dictionary data for agent {agent_id}") # Create updated agent from existing + new data try: # Start with existing agent's data updated_dict = existing_agent.to_dict() # Update with new data from frontend updated_dict.update(agent_data) # Ensure agent_id consistency updated_dict['id'] = agent_id # Create new SaapAgent object from updated data updated_agent = SaapAgent.from_dict(updated_dict) logger.debug(f"βœ… Successfully converted dict to SaapAgent: {updated_agent.name}") except Exception as conversion_error: logger.error(f"❌ Failed to convert dict to SaapAgent: {conversion_error}") logger.debug(f"πŸ” Problematic data: {agent_data}") return False elif isinstance(agent_data, SaapAgent): logger.debug(f"πŸ“₯ Received SaapAgent object for agent {agent_id}") updated_agent = agent_data # Ensure ID consistency updated_agent.id = agent_id else: logger.error(f"❌ Invalid agent_data type: {type(agent_data)}. Expected Dict or SaapAgent") return False # Update in memory cache self.agents[agent_id] = updated_agent logger.info(f"βœ… Memory cache updated for agent {agent_id}") # Try to update in database if available if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: # Delete old and insert new (simpler than complex update) await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) # Create new database record from updated agent db_agent = DBAgent.from_saap_agent(updated_agent) session.add(db_agent) await session.commit() logger.info(f"βœ… Database updated for agent {agent_id}") except Exception as db_error: logger.warning(f"⚠️ Database update failed for {agent_id}: {db_error}") # Don't fail the update if database fails - memory cache is updated else: logger.info(f"ℹ️ Database not available - agent {agent_id} updated in memory only") logger.info(f"βœ… Agent update completed successfully: {updated_agent.name} ({agent_id})") return True except Exception as e: logger.error(f"❌ Agent update failed for {agent_id}: {e}") logger.debug(f"πŸ” Agent data that caused error: {agent_data}") return False async def delete_agent(self, agent_id: str) -> bool: """Delete agent from memory and database""" try: # Stop agent if running await self.stop_agent(agent_id) # Remove from memory self.agents.pop(agent_id, None) # Try to remove from database if available if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) await session.commit() except Exception as db_error: logger.warning(f"⚠️ Database deletion failed for {agent_id}: {db_error}") logger.info(f"βœ… Agent deleted: {agent_id}") return True except Exception as e: logger.error(f"❌ Agent deletion failed: {e}") return False async def start_agent(self, agent_id: str) -> bool: """Start agent and create session""" try: agent = self.get_agent(agent_id) if not agent: logger.error(f"❌ Cannot start agent: {agent_id} not found") return False # Update status agent.status = AgentStatus.ACTIVE if hasattr(agent, 'metrics') and agent.metrics: agent.metrics.last_active = datetime.utcnow() # Try to create agent session in database if available if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: db_session = DBAgentSession(agent_id=agent_id) session.add(db_session) await session.commit() await session.refresh(db_session) # Store in active sessions self.active_sessions[agent_id] = db_session except Exception as db_error: logger.warning(f"⚠️ Database session creation failed for {agent_id}: {db_error}") # Update agent status in database if available await self._update_agent_status(agent_id, AgentStatus.ACTIVE) logger.info(f"βœ… Agent started: {agent.name} ({agent_id})") return True except Exception as e: logger.error(f"❌ Agent start failed: {e}") return False async def stop_agent(self, agent_id: str) -> bool: """Stop agent and close session""" try: agent = self.get_agent(agent_id) if not agent: return False # Update status agent.status = AgentStatus.INACTIVE # Close agent session if exists if agent_id in self.active_sessions: session_obj = self.active_sessions[agent_id] session_obj.session_end = datetime.utcnow() session_obj.status = "completed" session_obj.end_reason = "graceful" session_obj.calculate_duration() if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: await session.merge(session_obj) await session.commit() except Exception as db_error: logger.warning(f"⚠️ Database session update failed for {agent_id}: {db_error}") del self.active_sessions[agent_id] # Update agent status in database if available await self._update_agent_status(agent_id, AgentStatus.INACTIVE) logger.info(f"πŸ”§ Agent stopped: {agent_id}") return True except Exception as e: logger.error(f"❌ Agent stop failed: {e}") return False async def restart_agent(self, agent_id: str) -> bool: """Restart agent (stop + start)""" try: await self.stop_agent(agent_id) await asyncio.sleep(1) # Brief pause return await self.start_agent(agent_id) except Exception as e: logger.error(f"❌ Agent restart failed: {e}") return False async def _update_agent_status(self, agent_id: str, status: AgentStatus): """Update agent status in database""" if not db_manager.is_initialized: return try: async with db_manager.get_async_session() as session: await session.execute( update(DBAgent) .where(DBAgent.id == agent_id) .values(status=status.value, last_active=datetime.utcnow()) ) await session.commit() except Exception as e: logger.warning(f"⚠️ Failed to update agent status in database: {e}") # πŸš€ Multi-Provider Chat Support async def send_message_to_agent(self, agent_id: str, message: str, provider: Optional[str] = None) -> Dict[str, Any]: """ Send message to agent via specified provider or auto-fallback Args: agent_id: Target agent ID message: Message content provider: Optional provider override ("colossus", "openrouter", or None for auto) Returns: Chat response with metadata """ try: # Enhanced error checking with detailed debugging agent = self.get_agent(agent_id) if not agent: error_msg = f"Agent {agent_id} not found in loaded agents" logger.error(f"❌ {error_msg}") logger.debug(f"πŸ“‹ Available agents: {list(self.agents.keys())}") return { "error": error_msg, "timestamp": datetime.utcnow().isoformat(), "debug_info": { "available_agents": list(self.agents.keys()), "agent_manager_initialized": self.is_initialized } } # Check if agent is available for messaging if agent.status != AgentStatus.ACTIVE: error_msg = f"Agent {agent_id} not available (status: {agent.status.value})" logger.error(f"❌ {error_msg}") return { "error": error_msg, "timestamp": datetime.utcnow().isoformat(), "debug_info": { "agent_status": agent.status.value, "agent_id": agent_id } } # πŸš€ Multi-Provider Logic if provider == "openrouter": return await self._send_via_openrouter(agent_id, message, agent) elif provider == "colossus": return await self._send_via_colossus(agent_id, message, agent) else: # Auto-selection: Try colossus first, fallback to OpenRouter if self.colossus_connection_status == "connected": logger.info(f"πŸ”„ Using colossus as primary provider for {agent_id}") result = await self._send_via_colossus(agent_id, message, agent) # If colossus fails, try OpenRouter if "error" in result and "colossus" in result["error"].lower(): logger.info(f"πŸ”„ colossus failed, trying OpenRouter fallback...") return await self._send_via_openrouter(agent_id, message, agent) return result else: logger.info(f"πŸ”„ colossus unavailable, using OpenRouter as primary for {agent_id}") return await self._send_via_openrouter(agent_id, message, agent) except Exception as e: error_msg = str(e) logger.error(f"❌ Message to agent failed: {error_msg}") return { "error": error_msg, "timestamp": datetime.utcnow().isoformat(), "debug_info": { "agent_id": agent_id, "provider": provider, "colossus_status": self.colossus_connection_status, "agent_found": agent_id in self.agents, "colossus_client_exists": self.colossus_client is not None } } async def _send_via_openrouter(self, agent_id: str, message: str, agent: SaapAgent) -> Dict[str, Any]: """Send message via OpenRouter provider""" try: logger.info(f"🌐 jane_alesi (coordinator) initialized with OpenRouter FREE") # Create OpenRouter agent for this request openrouter_agent = OpenRouterSAAPAgent( agent_id, agent.type.value if agent.type else "Assistant", os.getenv("OPENROUTER_API_KEY") ) # Get cost-optimized model for specific agent model_map = { "jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"), "john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"), "lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku") } preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free") openrouter_agent.model_name = preferred_model start_time = datetime.utcnow() logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...") # πŸ”§ FIXED: Use safe LLM config access max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) # Send message via OpenRouter response = await openrouter_agent.send_request_to_openrouter( message, max_tokens=max_tokens_value ) end_time = datetime.utcnow() response_time = (end_time - start_time).total_seconds() if response.get("success"): logger.info(f"βœ… OpenRouter response successful in {response_time:.2f}s") response_content = response.get("response", "") tokens_used = response.get("token_count", 0) cost_usd = response.get("cost_usd", 0.0) # Try to save to database if available if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: chat_message = DBChatMessage( agent_id=agent_id, user_message=message, agent_response=response_content, response_time=response_time, tokens_used=tokens_used, metadata={ "model": preferred_model, "provider": "OpenRouter", "cost_usd": cost_usd, "temperature": 0.7 } ) session.add(chat_message) await session.commit() except Exception as db_error: logger.warning(f"⚠️ Failed to save OpenRouter chat to database: {db_error}") return { "content": response_content, "response_time": response_time, "tokens_used": tokens_used, "cost_usd": cost_usd, "provider": "OpenRouter", "model": preferred_model, "timestamp": end_time.isoformat() } else: error_msg = response.get("error", "Unknown OpenRouter error") logger.error(f"❌ OpenRouter fallback failed: {error_msg}") return { "error": f"OpenRouter error: {error_msg}", "provider": "OpenRouter", "timestamp": end_time.isoformat() } except Exception as e: logger.error(f"❌ OpenRouter fallback failed: {str(e)}") return { "error": f"OpenRouter error: {str(e)}", "provider": "OpenRouter", "timestamp": datetime.utcnow().isoformat() } async def _send_via_colossus(self, agent_id: str, message: str, agent: SaapAgent) -> Dict[str, Any]: """Send message via colossus provider""" try: # Check colossus client availability if not self.colossus_client: return { "error": "colossus client not initialized", "provider": "colossus", "timestamp": datetime.utcnow().isoformat() } # Test colossus connection if it's been a while if (not self.last_colossus_test or (datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes await self._test_colossus_connection() if self.colossus_connection_status != "connected": return { "error": f"colossus connection not healthy: {self.colossus_connection_status}", "provider": "colossus", "timestamp": datetime.utcnow().isoformat() } start_time = datetime.utcnow() logger.info(f"πŸ“€ Sending message to {agent.name} ({agent_id}) via colossus...") # πŸ”§ FIXED: Use safe LLM config access temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7) max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) # Send message to colossus response = await self.colossus_client.chat_completion( messages=[ {"role": "system", "content": agent.description or f"You are {agent.name}"}, {"role": "user", "content": message} ], agent_id=agent_id, temperature=temperature_value, max_tokens=max_tokens_value ) end_time = datetime.utcnow() response_time = (end_time - start_time).total_seconds() logger.info(f"πŸ“₯ Received response from colossus in {response_time:.2f}s") # Enhanced response parsing response_content = "" tokens_used = 0 if response: logger.debug(f"πŸ” Raw colossus response: {response}") if isinstance(response, dict): # SAAP ColossusClient format: {"success": true, "response": {...}} if response.get("success") and "response" in response: colossus_response = response["response"] if isinstance(colossus_response, dict) and "choices" in colossus_response: # OpenAI-compatible format within SAAP response if len(colossus_response["choices"]) > 0: choice = colossus_response["choices"][0] if "message" in choice and "content" in choice["message"]: response_content = choice["message"]["content"] elif isinstance(colossus_response, str): # Direct string response response_content = colossus_response # Extract token usage if available if isinstance(colossus_response, dict) and "usage" in colossus_response: tokens_used = colossus_response["usage"].get("total_tokens", 0) # Handle colossus client error responses elif not response.get("success"): error_msg = response.get("error", "Unknown colossus error") logger.error(f"❌ colossus error: {error_msg}") return { "error": f"colossus server error: {error_msg}", "provider": "colossus", "timestamp": end_time.isoformat() } # Direct OpenAI format: {"choices": [...]} elif "choices" in response and len(response["choices"]) > 0: choice = response["choices"][0] if "message" in choice and "content" in choice["message"]: response_content = choice["message"]["content"] if "usage" in response: tokens_used = response["usage"].get("total_tokens", 0) # Simple response format: {"response": "text"} or {"content": "text"} elif "response" in response: response_content = response["response"] elif "content" in response: response_content = response["content"] elif isinstance(response, str): # Direct string response response_content = response # Fallback if no content extracted if not response_content: logger.error(f"❌ Unable to extract content from colossus response: {response}") return { "error": "Failed to parse colossus response", "provider": "colossus", "timestamp": end_time.isoformat() } # Try to save to database if available if db_manager.is_initialized: try: async with db_manager.get_async_session() as session: chat_message = DBChatMessage( agent_id=agent_id, user_message=message, agent_response=response_content, response_time=response_time, tokens_used=tokens_used, metadata={ "model": "mistral-small3.2:24b-instruct-2506", "provider": "colossus", "temperature": 0.7 } ) session.add(chat_message) await session.commit() except Exception as db_error: logger.warning(f"⚠️ Failed to save chat message to database: {db_error}") # Update session metrics if agent_id in self.active_sessions: session_obj = self.active_sessions[agent_id] session_obj.messages_processed += 1 session_obj.total_tokens_used += tokens_used logger.info(f"βœ… Message processed successfully for {agent.name}") return { "content": response_content, "response_time": response_time, "tokens_used": tokens_used, "provider": "colossus", "model": "mistral-small3.2:24b-instruct-2506", "timestamp": end_time.isoformat() } except Exception as e: logger.error(f"❌ colossus communication failed: {str(e)}") return { "error": f"colossus error: {str(e)}", "provider": "colossus", "timestamp": datetime.utcnow().isoformat() } async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]: """Get comprehensive agent metrics from database""" if not db_manager.is_initialized: return {"warning": "Database not available - no metrics"} try: async with db_manager.get_async_session() as session: # Get message count and average response time result = await session.execute( select(DBChatMessage).where(DBChatMessage.agent_id == agent_id) ) messages = result.scalars().all() if messages: avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages) total_tokens = sum(m.tokens_used for m in messages if m.tokens_used) else: avg_response_time = 0 total_tokens = 0 # Get session count session_result = await session.execute( select(DBAgentSession).where(DBAgentSession.agent_id == agent_id) ) sessions = session_result.scalars().all() return { "total_messages": len(messages), "total_tokens_used": total_tokens, "average_response_time": avg_response_time, "total_sessions": len(sessions), "last_activity": max([s.session_start for s in sessions], default=None), } except Exception as e: logger.error(f"❌ Failed to get agent metrics: {e}") return {} async def get_system_status(self) -> Dict[str, Any]: """Get comprehensive system status for debugging""" return { "agent_manager_initialized": self.is_initialized, "colossus_connection_status": self.colossus_connection_status, "colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None, "loaded_agents": len(self.agents), "active_sessions": len(self.active_sessions), "agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value} for aid, agent in self.agents.items()], "database_initialized": getattr(db_manager, 'is_initialized', False) } async def shutdown_all_agents(self): """Gracefully shutdown all active agents""" try: logger.info("πŸ”§ Shutting down all agents...") for agent_id in list(self.agents.keys()): await self.stop_agent(agent_id) if self.colossus_client: await self.colossus_client.__aexit__(None, None, None) logger.info("βœ… All agents shut down successfully") except Exception as e: logger.error(f"❌ Agent shutdown failed: {e}") # Create global instance for dependency injection agent_manager = AgentManagerService() # Make class available for import AgentManager = AgentManagerService if __name__ == "__main__": async def test_agent_manager(): """Test agent manager functionality""" manager = AgentManagerService() await manager.initialize() # List agents agents = list(manager.agents.values()) print(f"πŸ“‹ Agents loaded: {[a.name for a in agents]}") # Test agent update with dict data (simulate frontend) if agents: agent = agents[0] print(f"\nπŸ§ͺ Testing agent update for: {agent.name}") # Simulate frontend data (dictionary) update_data = { "name": agent.name, "description": "Updated description from test", "type": agent.type.value if agent.type else "assistant", "capabilities": ["updated_capability_1", "updated_capability_2"] } # Test update success = await manager.update_agent(agent.id, update_data) print(f"πŸ”§ Update result: {'βœ…' if success else '❌'}") # Start first agent success = await manager.start_agent(agent.id) print(f"πŸš€ Start agent {agent.name}: {'βœ…' if success else '❌'}") await manager.shutdown_all_agents() asyncio.run(test_agent_manager())