Spaces:
Sleeping
Sleeping
| """ | |
| π§ CRITICAL FIX: Agent Settings/Update Problem - Data Type Mismatch Resolved | |
| Fixed both AgentManagerService.update_agent() method signature issues | |
| PROBLEM SOLVED: | |
| - β 'dict' object has no attribute 'id' β β Proper data conversion | |
| - β 'dict' object has no attribute 'name' β β SaapAgent object handling | |
| - β Agent Settings Modal fails β β Frontend-Backend compatibility | |
| This fixes the Agent Settings/Update functionality completely. | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| from typing import Dict, List, Optional, Any, Union | |
| from datetime import datetime | |
| import uuid | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, update, delete | |
| from models.agent import SaapAgent, AgentStatus, AgentType, AgentTemplates | |
| from database.connection import db_manager | |
| from database.models import DBAgent, DBChatMessage, DBAgentSession | |
| from api.colossus_client import ColossusClient | |
| from agents.openrouter_saap_agent import OpenRouterSAAPAgent | |
| logger = logging.getLogger(__name__) | |
| class AgentManagerService: | |
| """ | |
| π§ CRITICAL FIX: Production-ready Agent Manager with Agent Update Fix | |
| FIXED ISSUES: | |
| 1. β update_agent() now accepts both Dict and SaapAgent objects | |
| 2. β Proper data type conversion for FrontendβBackend compatibility | |
| 3. β Agent Settings Modal now works without AttributeError | |
| 4. β Maintains backward compatibility with existing code | |
| Features: | |
| - Database-backed agent storage and lifecycle | |
| - Real-time agent status management | |
| - colossus LLM integration with OpenRouter fallback | |
| - Session tracking and performance metrics | |
| - Health monitoring and error handling | |
| - Multi-provider chat support (colossus + OpenRouter) | |
| - β Robust LLM config access preventing AttributeError | |
| - β Fixed Agent Update/Settings functionality | |
| """ | |
| def __init__(self): | |
| self.agents: Dict[str, SaapAgent] = {} # In-memory cache for fast access | |
| self.active_sessions: Dict[str, DBAgentSession] = {} | |
| self.colossus_client: Optional[ColossusClient] = None | |
| self.is_initialized = False | |
| self.colossus_connection_status = "unknown" | |
| self.last_colossus_test = None | |
| def _get_llm_config_value(self, agent: SaapAgent, key: str, default=None): | |
| """ | |
| π§ CRITICAL FIX: Safe LLM config access preventing 'get' attribute errors | |
| This is the same fix applied to HybridAgentManagerService but now in the base class. | |
| Handles dictionary, object, and Pydantic model configurations robustly. | |
| Resolves: 'LLMModelConfig' object has no attribute 'get' | |
| """ | |
| try: | |
| if not hasattr(agent, 'llm_config') or not agent.llm_config: | |
| logger.debug(f"Agent {agent.id} has no llm_config, using default: {default}") | |
| return default | |
| llm_config = agent.llm_config | |
| # Case 1: Dictionary-based config (Frontend JSON) | |
| if isinstance(llm_config, dict): | |
| value = llm_config.get(key, default) | |
| logger.debug(f"β Dict config access: {key}={value}") | |
| return value | |
| # Case 2: Object with direct attribute access (Pydantic models) | |
| elif hasattr(llm_config, key): | |
| value = getattr(llm_config, key, default) | |
| logger.debug(f"β Attribute access: {key}={value}") | |
| return value | |
| # Case 3: Object with get() method (dict-like objects or fixed Pydantic) | |
| elif hasattr(llm_config, 'get') and callable(getattr(llm_config, 'get')): | |
| try: | |
| value = llm_config.get(key, default) | |
| logger.debug(f"β Method get() access: {key}={value}") | |
| return value | |
| except Exception as get_error: | |
| logger.warning(f"β οΈ get() method failed: {get_error}, trying fallback") | |
| # Case 4: Convert object to dict (Pydantic β dict) | |
| elif hasattr(llm_config, '__dict__'): | |
| config_dict = llm_config.__dict__ | |
| if key in config_dict: | |
| value = config_dict[key] | |
| logger.debug(f"β __dict__ access: {key}={value}") | |
| return value | |
| # Case 5: Try model_dump() for Pydantic v2 | |
| elif hasattr(llm_config, 'model_dump'): | |
| try: | |
| config_dict = llm_config.model_dump() | |
| value = config_dict.get(key, default) | |
| logger.debug(f"β model_dump() access: {key}={value}") | |
| return value | |
| except Exception: | |
| pass | |
| # Case 6: Try dict() conversion | |
| elif hasattr(llm_config, 'dict'): | |
| try: | |
| config_dict = llm_config.dict() | |
| value = config_dict.get(key, default) | |
| logger.debug(f"β dict() access: {key}={value}") | |
| return value | |
| except Exception: | |
| pass | |
| # Final fallback | |
| logger.warning(f"β οΈ Unknown config type {type(llm_config)} for {key}, using default: {default}") | |
| return default | |
| except AttributeError as e: | |
| logger.warning(f"β οΈ AttributeError in LLM config access for {key}: {e}, using default: {default}") | |
| return default | |
| except Exception as e: | |
| logger.error(f"β Unexpected error in LLM config access for {key}: {e}, using default: {default}") | |
| return default | |
| async def initialize(self): | |
| """Initialize agent manager with database and colossus connection""" | |
| try: | |
| logger.info("π Initializing Agent Manager Service...") | |
| # Initialize colossus client with better error handling | |
| try: | |
| logger.info("π Connecting to colossus server...") | |
| self.colossus_client = ColossusClient() | |
| await self.colossus_client.__aenter__() | |
| # Test colossus connection | |
| await self._test_colossus_connection() | |
| except Exception as colossus_error: | |
| logger.error(f"β colossus connection failed: {colossus_error}") | |
| self.colossus_connection_status = f"failed: {str(colossus_error)}" | |
| # Continue initialization without colossus (graceful degradation) | |
| # Try to load agents from database (graceful fallback if db not ready) | |
| await self._load_agents_from_database() | |
| # Load default agents if database is empty or not available | |
| if not self.agents: | |
| await self.load_default_agents() | |
| self.is_initialized = True | |
| logger.info(f"β Agent Manager initialized: {len(self.agents)} agents loaded") | |
| logger.info(f"π colossus status: {self.colossus_connection_status}") | |
| except Exception as e: | |
| logger.error(f"β Agent Manager initialization failed: {e}") | |
| raise | |
| async def _test_colossus_connection(self): | |
| """Test colossus connection and update status""" | |
| try: | |
| if not self.colossus_client: | |
| self.colossus_connection_status = "client_not_initialized" | |
| return | |
| # Send a simple test message | |
| test_messages = [ | |
| {"role": "system", "content": "You are a test assistant."}, | |
| {"role": "user", "content": "Reply with just 'OK' to confirm connection."} | |
| ] | |
| logger.info("π§ͺ Testing colossus connection...") | |
| response = await self.colossus_client.chat_completion( | |
| messages=test_messages, | |
| agent_id="connection_test", | |
| max_tokens=10 | |
| ) | |
| if response and response.get("success"): | |
| self.colossus_connection_status = "connected" | |
| self.last_colossus_test = datetime.utcnow() | |
| logger.info("β colossus connection test successful") | |
| else: | |
| error_msg = response.get("error", "unknown error") if response else "no response" | |
| self.colossus_connection_status = f"test_failed: {error_msg}" | |
| logger.error(f"β colossus connection test failed: {error_msg}") | |
| except Exception as e: | |
| self.colossus_connection_status = f"test_error: {str(e)}" | |
| logger.error(f"β colossus connection test error: {e}") | |
| async def _load_agents_from_database(self): | |
| """Load all agents from database into memory cache""" | |
| try: | |
| # Check if database manager is ready | |
| if not db_manager.is_initialized: | |
| logger.warning("β οΈ Database not yet initialized - will load default agents") | |
| return | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| db_agents = result.scalars().all() | |
| for db_agent in db_agents: | |
| saap_agent = db_agent.to_saap_agent() | |
| self.agents[saap_agent.id] = saap_agent | |
| logger.info(f"π Loaded {len(db_agents)} agents from database") | |
| except Exception as e: | |
| logger.error(f"β Failed to load agents from database: {e}") | |
| # Don't raise - allow service to start with empty agent list | |
| logger.info("π¦ Will proceed with in-memory agents only") | |
| async def load_default_agents(self): | |
| """Load default Alesi agents (Jane, John, Lara)""" | |
| try: | |
| logger.info("π€ Loading default Alesi agents...") | |
| default_agents = [ | |
| AgentTemplates.jane_alesi(), | |
| AgentTemplates.john_alesi(), | |
| AgentTemplates.lara_alesi() | |
| ] | |
| for agent in default_agents: | |
| await self.register_agent(agent) | |
| logger.info(f"β Default agents loaded: {[a.name for a in default_agents]}") | |
| except Exception as e: | |
| logger.error(f"β Agent registration failed: {e}") | |
| async def register_agent(self, agent: SaapAgent) -> bool: | |
| """Register new agent with database persistence""" | |
| try: | |
| # Always add to memory cache first | |
| self.agents[agent.id] = agent | |
| # Try to persist to database if available | |
| try: | |
| if db_manager.is_initialized: | |
| async with db_manager.get_async_session() as session: | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| await session.commit() | |
| logger.info(f"β Agent registered with database: {agent.name} ({agent.id})") | |
| else: | |
| logger.info(f"β Agent registered in-memory only: {agent.name} ({agent.id})") | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database persistence failed for {agent.name}: {db_error}") | |
| # But keep the agent in memory | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent registration failed: {e}") | |
| # Remove from cache if registration completely failed | |
| self.agents.pop(agent.id, None) | |
| return False | |
| def get_agent(self, agent_id: str) -> Optional[SaapAgent]: | |
| """Get agent from memory cache with debug info""" | |
| agent = self.agents.get(agent_id) | |
| if agent: | |
| logger.debug(f"π Agent found: {agent.name} ({agent_id}) - Status: {agent.status}") | |
| else: | |
| logger.warning(f"β Agent not found: {agent_id}") | |
| logger.debug(f"π Available agents: {list(self.agents.keys())}") | |
| return agent | |
| async def list_agents(self, status: Optional[AgentStatus] = None, | |
| agent_type: Optional[AgentType] = None) -> List[SaapAgent]: | |
| """List all agents with optional filtering""" | |
| agents = list(self.agents.values()) | |
| if status: | |
| agents = [a for a in agents if a.status == status] | |
| if agent_type: | |
| agents = [a for a in agents if a.type == agent_type] | |
| return agents | |
| async def get_agent_stats(self, agent_id: str) -> Dict[str, Any]: | |
| """Get agent statistics""" | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return {} | |
| # Return basic stats from agent object | |
| return { | |
| "messages_processed": getattr(agent, 'messages_processed', 0), | |
| "total_tokens": getattr(agent, 'total_tokens', 0), | |
| "average_response_time": getattr(agent, 'avg_response_time', 0), | |
| "status": agent.status.value, | |
| "last_active": getattr(agent, 'last_active', None) | |
| } | |
| async def health_check(self, agent_id: str) -> Dict[str, Any]: | |
| """Perform agent health check""" | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return {"healthy": False, "checks": {"agent_exists": False}} | |
| return { | |
| "healthy": agent.status == AgentStatus.ACTIVE, | |
| "checks": { | |
| "agent_exists": True, | |
| "status": agent.status.value, | |
| "colossus_connection": self.colossus_connection_status == "connected" | |
| } | |
| } | |
| async def update_agent(self, agent_id: str, agent_data: Union[Dict[str, Any], SaapAgent]) -> bool: | |
| """ | |
| π§ CRITICAL FIX: Update agent configuration with proper data type handling | |
| FIXED PROBLEMS: | |
| - β 'dict' object has no attribute 'id' β β Handles both Dict and SaapAgent | |
| - β 'dict' object has no attribute 'name' β β Proper data conversion | |
| - β Agent Settings Modal fails β β Frontend-Backend compatibility | |
| Args: | |
| agent_id: Agent ID to update | |
| agent_data: Either a dictionary (from Frontend) or SaapAgent object | |
| Returns: | |
| bool: Success status | |
| """ | |
| try: | |
| logger.info(f"π§ Updating agent {agent_id} with data type: {type(agent_data)}") | |
| # Get existing agent | |
| existing_agent = self.get_agent(agent_id) | |
| if not existing_agent: | |
| logger.error(f"β Cannot update: Agent {agent_id} not found") | |
| return False | |
| # Handle both Dict and SaapAgent input types | |
| if isinstance(agent_data, dict): | |
| logger.debug(f"π₯ Received dictionary data for agent {agent_id}") | |
| # Create updated agent from existing + new data | |
| try: | |
| # Start with existing agent's data | |
| updated_dict = existing_agent.to_dict() | |
| # Update with new data from frontend | |
| updated_dict.update(agent_data) | |
| # Ensure agent_id consistency | |
| updated_dict['id'] = agent_id | |
| # Create new SaapAgent object from updated data | |
| updated_agent = SaapAgent.from_dict(updated_dict) | |
| logger.debug(f"β Successfully converted dict to SaapAgent: {updated_agent.name}") | |
| except Exception as conversion_error: | |
| logger.error(f"β Failed to convert dict to SaapAgent: {conversion_error}") | |
| logger.debug(f"π Problematic data: {agent_data}") | |
| return False | |
| elif isinstance(agent_data, SaapAgent): | |
| logger.debug(f"π₯ Received SaapAgent object for agent {agent_id}") | |
| updated_agent = agent_data | |
| # Ensure ID consistency | |
| updated_agent.id = agent_id | |
| else: | |
| logger.error(f"β Invalid agent_data type: {type(agent_data)}. Expected Dict or SaapAgent") | |
| return False | |
| # Update in memory cache | |
| self.agents[agent_id] = updated_agent | |
| logger.info(f"β Memory cache updated for agent {agent_id}") | |
| # Try to update in database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| # Delete old and insert new (simpler than complex update) | |
| await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) | |
| # Create new database record from updated agent | |
| db_agent = DBAgent.from_saap_agent(updated_agent) | |
| session.add(db_agent) | |
| await session.commit() | |
| logger.info(f"β Database updated for agent {agent_id}") | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database update failed for {agent_id}: {db_error}") | |
| # Don't fail the update if database fails - memory cache is updated | |
| else: | |
| logger.info(f"βΉοΈ Database not available - agent {agent_id} updated in memory only") | |
| logger.info(f"β Agent update completed successfully: {updated_agent.name} ({agent_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent update failed for {agent_id}: {e}") | |
| logger.debug(f"π Agent data that caused error: {agent_data}") | |
| return False | |
| async def delete_agent(self, agent_id: str) -> bool: | |
| """Delete agent from memory and database""" | |
| try: | |
| # Stop agent if running | |
| await self.stop_agent(agent_id) | |
| # Remove from memory | |
| self.agents.pop(agent_id, None) | |
| # Try to remove from database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.execute(delete(DBAgent).where(DBAgent.id == agent_id)) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database deletion failed for {agent_id}: {db_error}") | |
| logger.info(f"β Agent deleted: {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent deletion failed: {e}") | |
| return False | |
| async def start_agent(self, agent_id: str) -> bool: | |
| """Start agent and create session""" | |
| try: | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| logger.error(f"β Cannot start agent: {agent_id} not found") | |
| return False | |
| # Update status | |
| agent.status = AgentStatus.ACTIVE | |
| if hasattr(agent, 'metrics') and agent.metrics: | |
| agent.metrics.last_active = datetime.utcnow() | |
| # Try to create agent session in database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| db_session = DBAgentSession(agent_id=agent_id) | |
| session.add(db_session) | |
| await session.commit() | |
| await session.refresh(db_session) | |
| # Store in active sessions | |
| self.active_sessions[agent_id] = db_session | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database session creation failed for {agent_id}: {db_error}") | |
| # Update agent status in database if available | |
| await self._update_agent_status(agent_id, AgentStatus.ACTIVE) | |
| logger.info(f"β Agent started: {agent.name} ({agent_id})") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent start failed: {e}") | |
| return False | |
| async def stop_agent(self, agent_id: str) -> bool: | |
| """Stop agent and close session""" | |
| try: | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| return False | |
| # Update status | |
| agent.status = AgentStatus.INACTIVE | |
| # Close agent session if exists | |
| if agent_id in self.active_sessions: | |
| session_obj = self.active_sessions[agent_id] | |
| session_obj.session_end = datetime.utcnow() | |
| session_obj.status = "completed" | |
| session_obj.end_reason = "graceful" | |
| session_obj.calculate_duration() | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.merge(session_obj) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Database session update failed for {agent_id}: {db_error}") | |
| del self.active_sessions[agent_id] | |
| # Update agent status in database if available | |
| await self._update_agent_status(agent_id, AgentStatus.INACTIVE) | |
| logger.info(f"π§ Agent stopped: {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Agent stop failed: {e}") | |
| return False | |
| async def restart_agent(self, agent_id: str) -> bool: | |
| """Restart agent (stop + start)""" | |
| try: | |
| await self.stop_agent(agent_id) | |
| await asyncio.sleep(1) # Brief pause | |
| return await self.start_agent(agent_id) | |
| except Exception as e: | |
| logger.error(f"β Agent restart failed: {e}") | |
| return False | |
| async def _update_agent_status(self, agent_id: str, status: AgentStatus): | |
| """Update agent status in database""" | |
| if not db_manager.is_initialized: | |
| return | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| await session.execute( | |
| update(DBAgent) | |
| .where(DBAgent.id == agent_id) | |
| .values(status=status.value, last_active=datetime.utcnow()) | |
| ) | |
| await session.commit() | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to update agent status in database: {e}") | |
| # π Multi-Provider Chat Support | |
| async def send_message_to_agent(self, agent_id: str, message: str, | |
| provider: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Send message to agent via specified provider or auto-fallback | |
| Args: | |
| agent_id: Target agent ID | |
| message: Message content | |
| provider: Optional provider override ("colossus", "openrouter", or None for auto) | |
| Returns: | |
| Chat response with metadata | |
| """ | |
| try: | |
| # Enhanced error checking with detailed debugging | |
| agent = self.get_agent(agent_id) | |
| if not agent: | |
| error_msg = f"Agent {agent_id} not found in loaded agents" | |
| logger.error(f"β {error_msg}") | |
| logger.debug(f"π Available agents: {list(self.agents.keys())}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "available_agents": list(self.agents.keys()), | |
| "agent_manager_initialized": self.is_initialized | |
| } | |
| } | |
| # Check if agent is available for messaging | |
| if agent.status != AgentStatus.ACTIVE: | |
| error_msg = f"Agent {agent_id} not available (status: {agent.status.value})" | |
| logger.error(f"β {error_msg}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "agent_status": agent.status.value, | |
| "agent_id": agent_id | |
| } | |
| } | |
| # π Multi-Provider Logic | |
| if provider == "openrouter": | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| elif provider == "colossus": | |
| return await self._send_via_colossus(agent_id, message, agent) | |
| else: | |
| # Auto-selection: Try colossus first, fallback to OpenRouter | |
| if self.colossus_connection_status == "connected": | |
| logger.info(f"π Using colossus as primary provider for {agent_id}") | |
| result = await self._send_via_colossus(agent_id, message, agent) | |
| # If colossus fails, try OpenRouter | |
| if "error" in result and "colossus" in result["error"].lower(): | |
| logger.info(f"π colossus failed, trying OpenRouter fallback...") | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| return result | |
| else: | |
| logger.info(f"π colossus unavailable, using OpenRouter as primary for {agent_id}") | |
| return await self._send_via_openrouter(agent_id, message, agent) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"β Message to agent failed: {error_msg}") | |
| return { | |
| "error": error_msg, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "debug_info": { | |
| "agent_id": agent_id, | |
| "provider": provider, | |
| "colossus_status": self.colossus_connection_status, | |
| "agent_found": agent_id in self.agents, | |
| "colossus_client_exists": self.colossus_client is not None | |
| } | |
| } | |
| async def _send_via_openrouter(self, agent_id: str, message: str, | |
| agent: SaapAgent) -> Dict[str, Any]: | |
| """Send message via OpenRouter provider""" | |
| try: | |
| logger.info(f"π jane_alesi (coordinator) initialized with OpenRouter FREE") | |
| # Create OpenRouter agent for this request | |
| openrouter_agent = OpenRouterSAAPAgent( | |
| agent_id, | |
| agent.type.value if agent.type else "Assistant", | |
| os.getenv("OPENROUTER_API_KEY") | |
| ) | |
| # Get cost-optimized model for specific agent | |
| model_map = { | |
| "jane_alesi": os.getenv("JANE_ALESI_MODEL", "openai/gpt-4o-mini"), | |
| "john_alesi": os.getenv("JOHN_ALESI_MODEL", "deepseek/deepseek-coder"), | |
| "lara_alesi": os.getenv("LARA_ALESI_MODEL", "anthropic/claude-3-haiku") | |
| } | |
| preferred_model = model_map.get(agent_id, "meta-llama/llama-3.2-3b-instruct:free") | |
| openrouter_agent.model_name = preferred_model | |
| start_time = datetime.utcnow() | |
| logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via OpenRouter ({preferred_model})...") | |
| # π§ FIXED: Use safe LLM config access | |
| max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) | |
| # Send message via OpenRouter | |
| response = await openrouter_agent.send_request_to_openrouter( | |
| message, | |
| max_tokens=max_tokens_value | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time = (end_time - start_time).total_seconds() | |
| if response.get("success"): | |
| logger.info(f"β OpenRouter response successful in {response_time:.2f}s") | |
| response_content = response.get("response", "") | |
| tokens_used = response.get("token_count", 0) | |
| cost_usd = response.get("cost_usd", 0.0) | |
| # Try to save to database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=message, | |
| agent_response=response_content, | |
| response_time=response_time, | |
| tokens_used=tokens_used, | |
| metadata={ | |
| "model": preferred_model, | |
| "provider": "OpenRouter", | |
| "cost_usd": cost_usd, | |
| "temperature": 0.7 | |
| } | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Failed to save OpenRouter chat to database: {db_error}") | |
| return { | |
| "content": response_content, | |
| "response_time": response_time, | |
| "tokens_used": tokens_used, | |
| "cost_usd": cost_usd, | |
| "provider": "OpenRouter", | |
| "model": preferred_model, | |
| "timestamp": end_time.isoformat() | |
| } | |
| else: | |
| error_msg = response.get("error", "Unknown OpenRouter error") | |
| logger.error(f"β OpenRouter fallback failed: {error_msg}") | |
| return { | |
| "error": f"OpenRouter error: {error_msg}", | |
| "provider": "OpenRouter", | |
| "timestamp": end_time.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β OpenRouter fallback failed: {str(e)}") | |
| return { | |
| "error": f"OpenRouter error: {str(e)}", | |
| "provider": "OpenRouter", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def _send_via_colossus(self, agent_id: str, message: str, | |
| agent: SaapAgent) -> Dict[str, Any]: | |
| """Send message via colossus provider""" | |
| try: | |
| # Check colossus client availability | |
| if not self.colossus_client: | |
| return { | |
| "error": "colossus client not initialized", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Test colossus connection if it's been a while | |
| if (not self.last_colossus_test or | |
| (datetime.utcnow() - self.last_colossus_test).seconds > 300): # 5 minutes | |
| await self._test_colossus_connection() | |
| if self.colossus_connection_status != "connected": | |
| return { | |
| "error": f"colossus connection not healthy: {self.colossus_connection_status}", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| start_time = datetime.utcnow() | |
| logger.info(f"π€ Sending message to {agent.name} ({agent_id}) via colossus...") | |
| # π§ FIXED: Use safe LLM config access | |
| temperature_value = self._get_llm_config_value(agent, 'temperature', 0.7) | |
| max_tokens_value = self._get_llm_config_value(agent, 'max_tokens', 1000) | |
| # Send message to colossus | |
| response = await self.colossus_client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": agent.description or f"You are {agent.name}"}, | |
| {"role": "user", "content": message} | |
| ], | |
| agent_id=agent_id, | |
| temperature=temperature_value, | |
| max_tokens=max_tokens_value | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time = (end_time - start_time).total_seconds() | |
| logger.info(f"π₯ Received response from colossus in {response_time:.2f}s") | |
| # Enhanced response parsing | |
| response_content = "" | |
| tokens_used = 0 | |
| if response: | |
| logger.debug(f"π Raw colossus response: {response}") | |
| if isinstance(response, dict): | |
| # SAAP ColossusClient format: {"success": true, "response": {...}} | |
| if response.get("success") and "response" in response: | |
| colossus_response = response["response"] | |
| if isinstance(colossus_response, dict) and "choices" in colossus_response: | |
| # OpenAI-compatible format within SAAP response | |
| if len(colossus_response["choices"]) > 0: | |
| choice = colossus_response["choices"][0] | |
| if "message" in choice and "content" in choice["message"]: | |
| response_content = choice["message"]["content"] | |
| elif isinstance(colossus_response, str): | |
| # Direct string response | |
| response_content = colossus_response | |
| # Extract token usage if available | |
| if isinstance(colossus_response, dict) and "usage" in colossus_response: | |
| tokens_used = colossus_response["usage"].get("total_tokens", 0) | |
| # Handle colossus client error responses | |
| elif not response.get("success"): | |
| error_msg = response.get("error", "Unknown colossus error") | |
| logger.error(f"β colossus error: {error_msg}") | |
| return { | |
| "error": f"colossus server error: {error_msg}", | |
| "provider": "colossus", | |
| "timestamp": end_time.isoformat() | |
| } | |
| # Direct OpenAI format: {"choices": [...]} | |
| elif "choices" in response and len(response["choices"]) > 0: | |
| choice = response["choices"][0] | |
| if "message" in choice and "content" in choice["message"]: | |
| response_content = choice["message"]["content"] | |
| if "usage" in response: | |
| tokens_used = response["usage"].get("total_tokens", 0) | |
| # Simple response format: {"response": "text"} or {"content": "text"} | |
| elif "response" in response: | |
| response_content = response["response"] | |
| elif "content" in response: | |
| response_content = response["content"] | |
| elif isinstance(response, str): | |
| # Direct string response | |
| response_content = response | |
| # Fallback if no content extracted | |
| if not response_content: | |
| logger.error(f"β Unable to extract content from colossus response: {response}") | |
| return { | |
| "error": "Failed to parse colossus response", | |
| "provider": "colossus", | |
| "timestamp": end_time.isoformat() | |
| } | |
| # Try to save to database if available | |
| if db_manager.is_initialized: | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=message, | |
| agent_response=response_content, | |
| response_time=response_time, | |
| tokens_used=tokens_used, | |
| metadata={ | |
| "model": "mistral-small3.2:24b-instruct-2506", | |
| "provider": "colossus", | |
| "temperature": 0.7 | |
| } | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| except Exception as db_error: | |
| logger.warning(f"β οΈ Failed to save chat message to database: {db_error}") | |
| # Update session metrics | |
| if agent_id in self.active_sessions: | |
| session_obj = self.active_sessions[agent_id] | |
| session_obj.messages_processed += 1 | |
| session_obj.total_tokens_used += tokens_used | |
| logger.info(f"β Message processed successfully for {agent.name}") | |
| return { | |
| "content": response_content, | |
| "response_time": response_time, | |
| "tokens_used": tokens_used, | |
| "provider": "colossus", | |
| "model": "mistral-small3.2:24b-instruct-2506", | |
| "timestamp": end_time.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β colossus communication failed: {str(e)}") | |
| return { | |
| "error": f"colossus error: {str(e)}", | |
| "provider": "colossus", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_agent_metrics(self, agent_id: str) -> Dict[str, Any]: | |
| """Get comprehensive agent metrics from database""" | |
| if not db_manager.is_initialized: | |
| return {"warning": "Database not available - no metrics"} | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| # Get message count and average response time | |
| result = await session.execute( | |
| select(DBChatMessage).where(DBChatMessage.agent_id == agent_id) | |
| ) | |
| messages = result.scalars().all() | |
| if messages: | |
| avg_response_time = sum(m.response_time for m in messages if m.response_time) / len(messages) | |
| total_tokens = sum(m.tokens_used for m in messages if m.tokens_used) | |
| else: | |
| avg_response_time = 0 | |
| total_tokens = 0 | |
| # Get session count | |
| session_result = await session.execute( | |
| select(DBAgentSession).where(DBAgentSession.agent_id == agent_id) | |
| ) | |
| sessions = session_result.scalars().all() | |
| return { | |
| "total_messages": len(messages), | |
| "total_tokens_used": total_tokens, | |
| "average_response_time": avg_response_time, | |
| "total_sessions": len(sessions), | |
| "last_activity": max([s.session_start for s in sessions], default=None), | |
| } | |
| except Exception as e: | |
| logger.error(f"β Failed to get agent metrics: {e}") | |
| return {} | |
| async def get_system_status(self) -> Dict[str, Any]: | |
| """Get comprehensive system status for debugging""" | |
| return { | |
| "agent_manager_initialized": self.is_initialized, | |
| "colossus_connection_status": self.colossus_connection_status, | |
| "colossus_last_test": self.last_colossus_test.isoformat() if self.last_colossus_test else None, | |
| "loaded_agents": len(self.agents), | |
| "active_sessions": len(self.active_sessions), | |
| "agent_list": [{"id": aid, "name": agent.name, "status": agent.status.value} | |
| for aid, agent in self.agents.items()], | |
| "database_initialized": getattr(db_manager, 'is_initialized', False) | |
| } | |
| async def shutdown_all_agents(self): | |
| """Gracefully shutdown all active agents""" | |
| try: | |
| logger.info("π§ Shutting down all agents...") | |
| for agent_id in list(self.agents.keys()): | |
| await self.stop_agent(agent_id) | |
| if self.colossus_client: | |
| await self.colossus_client.__aexit__(None, None, None) | |
| logger.info("β All agents shut down successfully") | |
| except Exception as e: | |
| logger.error(f"β Agent shutdown failed: {e}") | |
| # Create global instance for dependency injection | |
| agent_manager = AgentManagerService() | |
| # Make class available for import | |
| AgentManager = AgentManagerService | |
| if __name__ == "__main__": | |
| async def test_agent_manager(): | |
| """Test agent manager functionality""" | |
| manager = AgentManagerService() | |
| await manager.initialize() | |
| # List agents | |
| agents = list(manager.agents.values()) | |
| print(f"π Agents loaded: {[a.name for a in agents]}") | |
| # Test agent update with dict data (simulate frontend) | |
| if agents: | |
| agent = agents[0] | |
| print(f"\nπ§ͺ Testing agent update for: {agent.name}") | |
| # Simulate frontend data (dictionary) | |
| update_data = { | |
| "name": agent.name, | |
| "description": "Updated description from test", | |
| "type": agent.type.value if agent.type else "assistant", | |
| "capabilities": ["updated_capability_1", "updated_capability_2"] | |
| } | |
| # Test update | |
| success = await manager.update_agent(agent.id, update_data) | |
| print(f"π§ Update result: {'β ' if success else 'β'}") | |
| # Start first agent | |
| success = await manager.start_agent(agent.id) | |
| print(f"π Start agent {agent.name}: {'β ' if success else 'β'}") | |
| await manager.shutdown_all_agents() | |
| asyncio.run(test_agent_manager()) |