saap-plattform / backend /agent_manager_database_enhanced.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
13.9 kB
"""
πŸ”§ Enhanced Agent Manager Service - Database Loading Fix
Verbesserte Version mit robuster Database-Memory Integration
Behebt kritische Probleme:
- Agents werden nicht aus Database geladen beim Backend-Start
- Memory vs Database Mismatch zwischen Services
- Neue Agents verschwinden nach Server-Restart
- "No description available" Problem
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from services.agent_manager import AgentManagerService
from database.connection import db_manager
from database.models import DBAgent
from models.agent import SaapAgent, AgentStatus, AgentType
from sqlalchemy import select
logger = logging.getLogger(__name__)
class EnhancedAgentManagerService(AgentManagerService):
"""
Enhanced Agent Manager mit verbesserter Database Integration
Neue Features:
- Force Database Loading beim Startup
- Enhanced Database-Memory Bridge
- Guaranteed Agent Persistence
- Improved Error Handling
- Better Service Integration
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.database_loading_enabled = True
self.force_database_sync = True
async def initialize(self):
"""Enhanced initialization with guaranteed Database Agent Loading"""
try:
logger.info("πŸš€ Initializing Enhanced Agent Manager Service...")
# Initialize colossus client (from parent)
await self._initialize_colossus_client()
# Enhanced Database Agent Loading - GUARANTEED
await self.force_load_agents_from_database()
self.is_initialized = True
logger.info(f"βœ… Enhanced Agent Manager initialized: {len(self.agents)} agents loaded")
# Log all loaded agents for debugging
for agent_id, agent in self.agents.items():
logger.info(f"πŸ“‹ Loaded: {agent.name} ({agent_id}) - {agent.status.value}")
except Exception as e:
logger.error(f"❌ Enhanced Agent Manager initialization failed: {e}")
raise
async def _initialize_colossus_client(self):
"""Initialize colossus client with error handling"""
try:
from api.colossus_client import ColossusClient
logger.info("πŸ”Œ Connecting to colossus server...")
self.colossus_client = ColossusClient()
await self.colossus_client.__aenter__()
await self._test_colossus_connection()
except Exception as colossus_error:
logger.error(f"❌ colossus connection failed: {colossus_error}")
self.colossus_connection_status = f"failed: {str(colossus_error)}"
async def force_load_agents_from_database(self):
"""
πŸš€ ENHANCED: Force load all agents from database with guaranteed success
This method GUARANTEES that all database agents are loaded into memory.
It replaces the problematic _load_agents_from_database method.
"""
try:
logger.info("πŸ” Force loading agents from database...")
# Step 1: Ensure database is initialized
await self._ensure_database_initialized()
# Step 2: Load all agents from database
loaded_count = await self._load_database_agents()
# Step 3: Add default agents if database is empty
if loaded_count == 0:
logger.info("πŸ“¦ No agents in database - adding default agents...")
await self._add_default_agents_to_database()
loaded_count = await self._load_database_agents()
# Step 4: Verify loading success
await self._verify_agent_loading(loaded_count)
logger.info(f"βœ… Force loading successful: {len(self.agents)} agents in memory")
except Exception as e:
logger.error(f"❌ Force loading failed: {e}")
# Fallback to default agents if database loading fails completely
logger.info("πŸ†˜ Fallback: Loading default agents in memory-only mode...")
await super().load_default_agents()
async def _ensure_database_initialized(self):
"""Ensure database is properly initialized"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if not db_manager.is_initialized:
logger.info(f"πŸ“Š Database not initialized - attempting initialization (retry {retry_count + 1}/{max_retries})...")
await db_manager.initialize()
# Test database connectivity
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent).limit(1))
result.scalars().first()
logger.info("βœ… Database connection verified")
return
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise Exception(f"Database initialization failed after {max_retries} retries: {e}")
logger.warning(f"⚠️ Database init attempt {retry_count} failed: {e}")
await asyncio.sleep(1)
async def _load_database_agents(self) -> int:
"""Load all agents from database into memory"""
try:
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
logger.info(f"πŸ“š Found {len(db_agents)} agents in database")
# Clear existing agents and load from database
self.agents.clear()
loaded_count = 0
for db_agent in db_agents:
try:
saap_agent = db_agent.to_saap_agent()
self.agents[saap_agent.id] = saap_agent
loaded_count += 1
logger.debug(f"πŸ”„ Loaded: {saap_agent.name} ({saap_agent.id})")
except Exception as conversion_error:
logger.warning(f"⚠️ Failed to convert agent {db_agent.id}: {conversion_error}")
logger.info(f"βœ… Successfully loaded {loaded_count} agents from database")
return loaded_count
except Exception as e:
logger.error(f"❌ Database agent loading failed: {e}")
return 0
async def _add_default_agents_to_database(self):
"""Add default Alesi agents to database"""
try:
from models.agent import AgentTemplates
default_agents = [
AgentTemplates.jane_alesi(),
AgentTemplates.john_alesi(),
AgentTemplates.lara_alesi()
]
async with db_manager.get_async_session() as session:
for agent in default_agents:
# Check if agent already exists
result = await session.execute(
select(DBAgent).where(DBAgent.id == agent.id)
)
if result.scalars().first():
logger.debug(f"⚠️ Agent {agent.id} already exists in database")
continue
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
logger.info(f"βž• Added default agent to database: {agent.name}")
await session.commit()
logger.info("βœ… Default agents added to database")
except Exception as e:
logger.error(f"❌ Failed to add default agents to database: {e}")
raise
async def _verify_agent_loading(self, expected_count: int):
"""Verify that agent loading was successful"""
memory_count = len(self.agents)
if memory_count != expected_count:
logger.warning(f"⚠️ Agent count mismatch: Expected {expected_count}, got {memory_count}")
# Verify agent data integrity
for agent_id, agent in self.agents.items():
if not agent.name or agent.name == "Unknown Agent":
logger.warning(f"⚠️ Agent {agent_id} has missing name")
if not agent.description:
logger.warning(f"⚠️ Agent {agent_id} has missing description")
logger.info(f"βœ… Agent loading verification completed: {memory_count} agents")
async def register_agent(self, agent: SaapAgent) -> bool:
"""Enhanced agent registration with guaranteed database persistence"""
try:
# Always add to memory cache first
self.agents[agent.id] = agent
logger.info(f"πŸ“ Agent added to memory: {agent.name} ({agent.id})")
# Force database persistence
await self._force_database_persistence(agent)
return True
except Exception as e:
logger.error(f"❌ Enhanced agent registration failed: {e}")
# Remove from cache if registration failed
self.agents.pop(agent.id, None)
return False
async def _force_database_persistence(self, agent: SaapAgent):
"""Force agent persistence to database with retries"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
await self._ensure_database_initialized()
async with db_manager.get_async_session() as session:
# Check if agent already exists
result = await session.execute(
select(DBAgent).where(DBAgent.id == agent.id)
)
existing = result.scalars().first()
if existing:
# Update existing agent
updated_agent = DBAgent.from_saap_agent(agent)
await session.merge(updated_agent)
logger.info(f"πŸ”„ Agent updated in database: {agent.name}")
else:
# Create new agent
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
logger.info(f"βž• Agent added to database: {agent.name}")
await session.commit()
logger.info(f"βœ… Database persistence successful: {agent.name}")
return
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise Exception(f"Database persistence failed after {max_retries} retries: {e}")
logger.warning(f"⚠️ Database persistence attempt {retry_count} failed: {e}")
await asyncio.sleep(0.5)
async def get_comprehensive_agent_status(self) -> Dict[str, Any]:
"""Get comprehensive status for debugging"""
try:
# Database agent count
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
db_count = len(db_agents)
# Memory agent count
memory_count = len(self.agents)
# Agent details
agent_details = []
for agent_id, agent in self.agents.items():
agent_details.append({
"id": agent_id,
"name": agent.name,
"type": agent.type.value,
"status": agent.status.value,
"has_description": bool(agent.description and agent.description != "No description available")
})
return {
"database_initialized": db_manager.is_initialized,
"database_agent_count": db_count,
"memory_agent_count": memory_count,
"sync_status": "synced" if db_count == memory_count else "out_of_sync",
"agent_details": agent_details,
"colossus_status": self.colossus_connection_status,
"enhanced_features_active": True
}
except Exception as e:
logger.error(f"❌ Status check failed: {e}")
return {"error": str(e)}
# Create enhanced global instance
enhanced_agent_manager = EnhancedAgentManagerService()
if __name__ == "__main__":
async def test_enhanced_agent_manager():
"""Test enhanced agent manager functionality"""
manager = EnhancedAgentManagerService()
await manager.initialize()
# Get comprehensive status
status = await manager.get_comprehensive_agent_status()
print("πŸ“Š Enhanced Agent Manager Status:")
print(f" Database: {status.get('database_agent_count', 0)} agents")
print(f" Memory: {status.get('memory_agent_count', 0)} agents")
print(f" Sync: {status.get('sync_status', 'unknown')}")
print(f" Enhanced: {status.get('enhanced_features_active', False)}")
await manager.shutdown_all_agents()
asyncio.run(test_enhanced_agent_manager())