Spaces:
Sleeping
Sleeping
| """ | |
| π§ Enhanced Agent Manager Service - Database Loading Fix | |
| Verbesserte Version mit robuster Database-Memory Integration | |
| Behebt kritische Probleme: | |
| - Agents werden nicht aus Database geladen beim Backend-Start | |
| - Memory vs Database Mismatch zwischen Services | |
| - Neue Agents verschwinden nach Server-Restart | |
| - "No description available" Problem | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| from services.agent_manager import AgentManagerService | |
| from database.connection import db_manager | |
| from database.models import DBAgent | |
| from models.agent import SaapAgent, AgentStatus, AgentType | |
| from sqlalchemy import select | |
| logger = logging.getLogger(__name__) | |
| class EnhancedAgentManagerService(AgentManagerService): | |
| """ | |
| Enhanced Agent Manager mit verbesserter Database Integration | |
| Neue Features: | |
| - Force Database Loading beim Startup | |
| - Enhanced Database-Memory Bridge | |
| - Guaranteed Agent Persistence | |
| - Improved Error Handling | |
| - Better Service Integration | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.database_loading_enabled = True | |
| self.force_database_sync = True | |
| async def initialize(self): | |
| """Enhanced initialization with guaranteed Database Agent Loading""" | |
| try: | |
| logger.info("π Initializing Enhanced Agent Manager Service...") | |
| # Initialize colossus client (from parent) | |
| await self._initialize_colossus_client() | |
| # Enhanced Database Agent Loading - GUARANTEED | |
| await self.force_load_agents_from_database() | |
| self.is_initialized = True | |
| logger.info(f"β Enhanced Agent Manager initialized: {len(self.agents)} agents loaded") | |
| # Log all loaded agents for debugging | |
| for agent_id, agent in self.agents.items(): | |
| logger.info(f"π Loaded: {agent.name} ({agent_id}) - {agent.status.value}") | |
| except Exception as e: | |
| logger.error(f"β Enhanced Agent Manager initialization failed: {e}") | |
| raise | |
| async def _initialize_colossus_client(self): | |
| """Initialize colossus client with error handling""" | |
| try: | |
| from api.colossus_client import ColossusClient | |
| logger.info("π Connecting to colossus server...") | |
| self.colossus_client = ColossusClient() | |
| await self.colossus_client.__aenter__() | |
| await self._test_colossus_connection() | |
| except Exception as colossus_error: | |
| logger.error(f"β colossus connection failed: {colossus_error}") | |
| self.colossus_connection_status = f"failed: {str(colossus_error)}" | |
| async def force_load_agents_from_database(self): | |
| """ | |
| π ENHANCED: Force load all agents from database with guaranteed success | |
| This method GUARANTEES that all database agents are loaded into memory. | |
| It replaces the problematic _load_agents_from_database method. | |
| """ | |
| try: | |
| logger.info("π Force loading agents from database...") | |
| # Step 1: Ensure database is initialized | |
| await self._ensure_database_initialized() | |
| # Step 2: Load all agents from database | |
| loaded_count = await self._load_database_agents() | |
| # Step 3: Add default agents if database is empty | |
| if loaded_count == 0: | |
| logger.info("π¦ No agents in database - adding default agents...") | |
| await self._add_default_agents_to_database() | |
| loaded_count = await self._load_database_agents() | |
| # Step 4: Verify loading success | |
| await self._verify_agent_loading(loaded_count) | |
| logger.info(f"β Force loading successful: {len(self.agents)} agents in memory") | |
| except Exception as e: | |
| logger.error(f"β Force loading failed: {e}") | |
| # Fallback to default agents if database loading fails completely | |
| logger.info("π Fallback: Loading default agents in memory-only mode...") | |
| await super().load_default_agents() | |
| async def _ensure_database_initialized(self): | |
| """Ensure database is properly initialized""" | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| if not db_manager.is_initialized: | |
| logger.info(f"π Database not initialized - attempting initialization (retry {retry_count + 1}/{max_retries})...") | |
| await db_manager.initialize() | |
| # Test database connectivity | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent).limit(1)) | |
| result.scalars().first() | |
| logger.info("β Database connection verified") | |
| return | |
| except Exception as e: | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| raise Exception(f"Database initialization failed after {max_retries} retries: {e}") | |
| logger.warning(f"β οΈ Database init attempt {retry_count} failed: {e}") | |
| await asyncio.sleep(1) | |
| async def _load_database_agents(self) -> int: | |
| """Load all agents from database into memory""" | |
| try: | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| db_agents = result.scalars().all() | |
| logger.info(f"π Found {len(db_agents)} agents in database") | |
| # Clear existing agents and load from database | |
| self.agents.clear() | |
| loaded_count = 0 | |
| for db_agent in db_agents: | |
| try: | |
| saap_agent = db_agent.to_saap_agent() | |
| self.agents[saap_agent.id] = saap_agent | |
| loaded_count += 1 | |
| logger.debug(f"π Loaded: {saap_agent.name} ({saap_agent.id})") | |
| except Exception as conversion_error: | |
| logger.warning(f"β οΈ Failed to convert agent {db_agent.id}: {conversion_error}") | |
| logger.info(f"β Successfully loaded {loaded_count} agents from database") | |
| return loaded_count | |
| except Exception as e: | |
| logger.error(f"β Database agent loading failed: {e}") | |
| return 0 | |
| async def _add_default_agents_to_database(self): | |
| """Add default Alesi agents to database""" | |
| try: | |
| from models.agent import AgentTemplates | |
| default_agents = [ | |
| AgentTemplates.jane_alesi(), | |
| AgentTemplates.john_alesi(), | |
| AgentTemplates.lara_alesi() | |
| ] | |
| async with db_manager.get_async_session() as session: | |
| for agent in default_agents: | |
| # Check if agent already exists | |
| result = await session.execute( | |
| select(DBAgent).where(DBAgent.id == agent.id) | |
| ) | |
| if result.scalars().first(): | |
| logger.debug(f"β οΈ Agent {agent.id} already exists in database") | |
| continue | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| logger.info(f"β Added default agent to database: {agent.name}") | |
| await session.commit() | |
| logger.info("β Default agents added to database") | |
| except Exception as e: | |
| logger.error(f"β Failed to add default agents to database: {e}") | |
| raise | |
| async def _verify_agent_loading(self, expected_count: int): | |
| """Verify that agent loading was successful""" | |
| memory_count = len(self.agents) | |
| if memory_count != expected_count: | |
| logger.warning(f"β οΈ Agent count mismatch: Expected {expected_count}, got {memory_count}") | |
| # Verify agent data integrity | |
| for agent_id, agent in self.agents.items(): | |
| if not agent.name or agent.name == "Unknown Agent": | |
| logger.warning(f"β οΈ Agent {agent_id} has missing name") | |
| if not agent.description: | |
| logger.warning(f"β οΈ Agent {agent_id} has missing description") | |
| logger.info(f"β Agent loading verification completed: {memory_count} agents") | |
| async def register_agent(self, agent: SaapAgent) -> bool: | |
| """Enhanced agent registration with guaranteed database persistence""" | |
| try: | |
| # Always add to memory cache first | |
| self.agents[agent.id] = agent | |
| logger.info(f"π Agent added to memory: {agent.name} ({agent.id})") | |
| # Force database persistence | |
| await self._force_database_persistence(agent) | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Enhanced agent registration failed: {e}") | |
| # Remove from cache if registration failed | |
| self.agents.pop(agent.id, None) | |
| return False | |
| async def _force_database_persistence(self, agent: SaapAgent): | |
| """Force agent persistence to database with retries""" | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| await self._ensure_database_initialized() | |
| async with db_manager.get_async_session() as session: | |
| # Check if agent already exists | |
| result = await session.execute( | |
| select(DBAgent).where(DBAgent.id == agent.id) | |
| ) | |
| existing = result.scalars().first() | |
| if existing: | |
| # Update existing agent | |
| updated_agent = DBAgent.from_saap_agent(agent) | |
| await session.merge(updated_agent) | |
| logger.info(f"π Agent updated in database: {agent.name}") | |
| else: | |
| # Create new agent | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| logger.info(f"β Agent added to database: {agent.name}") | |
| await session.commit() | |
| logger.info(f"β Database persistence successful: {agent.name}") | |
| return | |
| except Exception as e: | |
| retry_count += 1 | |
| if retry_count >= max_retries: | |
| raise Exception(f"Database persistence failed after {max_retries} retries: {e}") | |
| logger.warning(f"β οΈ Database persistence attempt {retry_count} failed: {e}") | |
| await asyncio.sleep(0.5) | |
| async def get_comprehensive_agent_status(self) -> Dict[str, Any]: | |
| """Get comprehensive status for debugging""" | |
| try: | |
| # Database agent count | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| db_agents = result.scalars().all() | |
| db_count = len(db_agents) | |
| # Memory agent count | |
| memory_count = len(self.agents) | |
| # Agent details | |
| agent_details = [] | |
| for agent_id, agent in self.agents.items(): | |
| agent_details.append({ | |
| "id": agent_id, | |
| "name": agent.name, | |
| "type": agent.type.value, | |
| "status": agent.status.value, | |
| "has_description": bool(agent.description and agent.description != "No description available") | |
| }) | |
| return { | |
| "database_initialized": db_manager.is_initialized, | |
| "database_agent_count": db_count, | |
| "memory_agent_count": memory_count, | |
| "sync_status": "synced" if db_count == memory_count else "out_of_sync", | |
| "agent_details": agent_details, | |
| "colossus_status": self.colossus_connection_status, | |
| "enhanced_features_active": True | |
| } | |
| except Exception as e: | |
| logger.error(f"β Status check failed: {e}") | |
| return {"error": str(e)} | |
| # Create enhanced global instance | |
| enhanced_agent_manager = EnhancedAgentManagerService() | |
| if __name__ == "__main__": | |
| async def test_enhanced_agent_manager(): | |
| """Test enhanced agent manager functionality""" | |
| manager = EnhancedAgentManagerService() | |
| await manager.initialize() | |
| # Get comprehensive status | |
| status = await manager.get_comprehensive_agent_status() | |
| print("π Enhanced Agent Manager Status:") | |
| print(f" Database: {status.get('database_agent_count', 0)} agents") | |
| print(f" Memory: {status.get('memory_agent_count', 0)} agents") | |
| print(f" Sync: {status.get('sync_status', 'unknown')}") | |
| print(f" Enhanced: {status.get('enhanced_features_active', False)}") | |
| await manager.shutdown_all_agents() | |
| asyncio.run(test_enhanced_agent_manager()) |