""" 🔧 Enhanced Agent Manager Service - Database Loading Fix Verbesserte Version mit robuster Database-Memory Integration Behebt kritische Probleme: - Agents werden nicht aus Database geladen beim Backend-Start - Memory vs Database Mismatch zwischen Services - Neue Agents verschwinden nach Server-Restart - "No description available" Problem """ import asyncio import logging from typing import Dict, List, Optional, Any from datetime import datetime from services.agent_manager import AgentManagerService from database.connection import db_manager from database.models import DBAgent from models.agent import SaapAgent, AgentStatus, AgentType from sqlalchemy import select logger = logging.getLogger(__name__) class EnhancedAgentManagerService(AgentManagerService): """ Enhanced Agent Manager mit verbesserter Database Integration Neue Features: - Force Database Loading beim Startup - Enhanced Database-Memory Bridge - Guaranteed Agent Persistence - Improved Error Handling - Better Service Integration """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.database_loading_enabled = True self.force_database_sync = True async def initialize(self): """Enhanced initialization with guaranteed Database Agent Loading""" try: logger.info("🚀 Initializing Enhanced Agent Manager Service...") # Initialize colossus client (from parent) await self._initialize_colossus_client() # Enhanced Database Agent Loading - GUARANTEED await self.force_load_agents_from_database() self.is_initialized = True logger.info(f"✅ Enhanced Agent Manager initialized: {len(self.agents)} agents loaded") # Log all loaded agents for debugging for agent_id, agent in self.agents.items(): logger.info(f"📋 Loaded: {agent.name} ({agent_id}) - {agent.status.value}") except Exception as e: logger.error(f"❌ Enhanced Agent Manager initialization failed: {e}") raise async def _initialize_colossus_client(self): """Initialize colossus client with error handling""" try: from api.colossus_client import ColossusClient logger.info("🔌 Connecting to colossus server...") self.colossus_client = ColossusClient() await self.colossus_client.__aenter__() await self._test_colossus_connection() except Exception as colossus_error: logger.error(f"❌ colossus connection failed: {colossus_error}") self.colossus_connection_status = f"failed: {str(colossus_error)}" async def force_load_agents_from_database(self): """ 🚀 ENHANCED: Force load all agents from database with guaranteed success This method GUARANTEES that all database agents are loaded into memory. It replaces the problematic _load_agents_from_database method. """ try: logger.info("🔍 Force loading agents from database...") # Step 1: Ensure database is initialized await self._ensure_database_initialized() # Step 2: Load all agents from database loaded_count = await self._load_database_agents() # Step 3: Add default agents if database is empty if loaded_count == 0: logger.info("📦 No agents in database - adding default agents...") await self._add_default_agents_to_database() loaded_count = await self._load_database_agents() # Step 4: Verify loading success await self._verify_agent_loading(loaded_count) logger.info(f"✅ Force loading successful: {len(self.agents)} agents in memory") except Exception as e: logger.error(f"❌ Force loading failed: {e}") # Fallback to default agents if database loading fails completely logger.info("🆘 Fallback: Loading default agents in memory-only mode...") await super().load_default_agents() async def _ensure_database_initialized(self): """Ensure database is properly initialized""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if not db_manager.is_initialized: logger.info(f"📊 Database not initialized - attempting initialization (retry {retry_count + 1}/{max_retries})...") await db_manager.initialize() # Test database connectivity async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent).limit(1)) result.scalars().first() logger.info("✅ Database connection verified") return except Exception as e: retry_count += 1 if retry_count >= max_retries: raise Exception(f"Database initialization failed after {max_retries} retries: {e}") logger.warning(f"⚠️ Database init attempt {retry_count} failed: {e}") await asyncio.sleep(1) async def _load_database_agents(self) -> int: """Load all agents from database into memory""" try: async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent)) db_agents = result.scalars().all() logger.info(f"📚 Found {len(db_agents)} agents in database") # Clear existing agents and load from database self.agents.clear() loaded_count = 0 for db_agent in db_agents: try: saap_agent = db_agent.to_saap_agent() self.agents[saap_agent.id] = saap_agent loaded_count += 1 logger.debug(f"🔄 Loaded: {saap_agent.name} ({saap_agent.id})") except Exception as conversion_error: logger.warning(f"⚠️ Failed to convert agent {db_agent.id}: {conversion_error}") logger.info(f"✅ Successfully loaded {loaded_count} agents from database") return loaded_count except Exception as e: logger.error(f"❌ Database agent loading failed: {e}") return 0 async def _add_default_agents_to_database(self): """Add default Alesi agents to database""" try: from models.agent import AgentTemplates default_agents = [ AgentTemplates.jane_alesi(), AgentTemplates.john_alesi(), AgentTemplates.lara_alesi() ] async with db_manager.get_async_session() as session: for agent in default_agents: # Check if agent already exists result = await session.execute( select(DBAgent).where(DBAgent.id == agent.id) ) if result.scalars().first(): logger.debug(f"⚠️ Agent {agent.id} already exists in database") continue db_agent = DBAgent.from_saap_agent(agent) session.add(db_agent) logger.info(f"➕ Added default agent to database: {agent.name}") await session.commit() logger.info("✅ Default agents added to database") except Exception as e: logger.error(f"❌ Failed to add default agents to database: {e}") raise async def _verify_agent_loading(self, expected_count: int): """Verify that agent loading was successful""" memory_count = len(self.agents) if memory_count != expected_count: logger.warning(f"⚠️ Agent count mismatch: Expected {expected_count}, got {memory_count}") # Verify agent data integrity for agent_id, agent in self.agents.items(): if not agent.name or agent.name == "Unknown Agent": logger.warning(f"⚠️ Agent {agent_id} has missing name") if not agent.description: logger.warning(f"⚠️ Agent {agent_id} has missing description") logger.info(f"✅ Agent loading verification completed: {memory_count} agents") async def register_agent(self, agent: SaapAgent) -> bool: """Enhanced agent registration with guaranteed database persistence""" try: # Always add to memory cache first self.agents[agent.id] = agent logger.info(f"📝 Agent added to memory: {agent.name} ({agent.id})") # Force database persistence await self._force_database_persistence(agent) return True except Exception as e: logger.error(f"❌ Enhanced agent registration failed: {e}") # Remove from cache if registration failed self.agents.pop(agent.id, None) return False async def _force_database_persistence(self, agent: SaapAgent): """Force agent persistence to database with retries""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: await self._ensure_database_initialized() async with db_manager.get_async_session() as session: # Check if agent already exists result = await session.execute( select(DBAgent).where(DBAgent.id == agent.id) ) existing = result.scalars().first() if existing: # Update existing agent updated_agent = DBAgent.from_saap_agent(agent) await session.merge(updated_agent) logger.info(f"🔄 Agent updated in database: {agent.name}") else: # Create new agent db_agent = DBAgent.from_saap_agent(agent) session.add(db_agent) logger.info(f"➕ Agent added to database: {agent.name}") await session.commit() logger.info(f"✅ Database persistence successful: {agent.name}") return except Exception as e: retry_count += 1 if retry_count >= max_retries: raise Exception(f"Database persistence failed after {max_retries} retries: {e}") logger.warning(f"⚠️ Database persistence attempt {retry_count} failed: {e}") await asyncio.sleep(0.5) async def get_comprehensive_agent_status(self) -> Dict[str, Any]: """Get comprehensive status for debugging""" try: # Database agent count async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent)) db_agents = result.scalars().all() db_count = len(db_agents) # Memory agent count memory_count = len(self.agents) # Agent details agent_details = [] for agent_id, agent in self.agents.items(): agent_details.append({ "id": agent_id, "name": agent.name, "type": agent.type.value, "status": agent.status.value, "has_description": bool(agent.description and agent.description != "No description available") }) return { "database_initialized": db_manager.is_initialized, "database_agent_count": db_count, "memory_agent_count": memory_count, "sync_status": "synced" if db_count == memory_count else "out_of_sync", "agent_details": agent_details, "colossus_status": self.colossus_connection_status, "enhanced_features_active": True } except Exception as e: logger.error(f"❌ Status check failed: {e}") return {"error": str(e)} # Create enhanced global instance enhanced_agent_manager = EnhancedAgentManagerService() if __name__ == "__main__": async def test_enhanced_agent_manager(): """Test enhanced agent manager functionality""" manager = EnhancedAgentManagerService() await manager.initialize() # Get comprehensive status status = await manager.get_comprehensive_agent_status() print("📊 Enhanced Agent Manager Status:") print(f" Database: {status.get('database_agent_count', 0)} agents") print(f" Memory: {status.get('memory_agent_count', 0)} agents") print(f" Sync: {status.get('sync_status', 'unknown')}") print(f" Enhanced: {status.get('enhanced_features_active', False)}") await manager.shutdown_all_agents() asyncio.run(test_enhanced_agent_manager())