""" SAAP Database Service - High-level Database Operations Simplified database operations for Agent Manager with proper initialization """ import asyncio import logging from typing import Dict, List, Optional, Any from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, delete from sqlalchemy.exc import IntegrityError, SQLAlchemyError from database.connection import db_manager from database.models import DBAgent, DBChatMessage, DBAgentSession from models.agent import SaapAgent, AgentStatus logger = logging.getLogger(__name__) class DatabaseService: """ High-level database service for SAAP Agent operations Handles all database operations with proper error handling """ def __init__(self): self.is_ready = False async def initialize(self): """Initialize database service and ensure tables exist""" try: logger.info("๐Ÿ—„๏ธ Initializing SAAP Database Service...") # Initialize database manager if not already done if not db_manager.is_initialized: await db_manager.initialize() # Force database tables creation await self._ensure_tables_exist() self.is_ready = True logger.info("โœ… Database Service initialized successfully") except Exception as e: logger.error(f"โŒ Database Service initialization failed: {e}") raise async def _ensure_tables_exist(self): """Ensure all required database tables exist""" try: # Force table creation by running a simple query async with db_manager.get_async_session() as session: # Test if agents table exists by trying to count rows try: result = await session.execute(select(DBAgent)) agents = result.scalars().all() logger.info(f"๐Ÿ“š Found {len(agents)} existing agents in database") except Exception as table_error: logger.warning(f"โš ๏ธ Agents table not ready: {table_error}") # Tables should already be created by db_manager.initialize() except Exception as e: logger.error(f"โŒ Failed to verify tables: {e}") raise async def save_agent(self, agent: SaapAgent) -> bool: """Save or update agent in database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: # Check if agent exists existing = await session.execute( select(DBAgent).where(DBAgent.id == agent.id) ) db_agent = existing.scalar_one_or_none() if db_agent: # Update existing agent for key, value in DBAgent.from_saap_agent(agent).__dict__.items(): if not key.startswith('_'): # Skip SQLAlchemy internal attributes setattr(db_agent, key, value) db_agent.updated_at = datetime.utcnow() logger.info(f"๐Ÿ”„ Updating existing agent: {agent.name}") else: # Create new agent db_agent = DBAgent.from_saap_agent(agent) session.add(db_agent) logger.info(f"โž• Creating new agent: {agent.name}") await session.commit() logger.info(f"โœ… Agent saved successfully: {agent.name} ({agent.id})") return True except IntegrityError as ie: logger.error(f"โŒ Database integrity error saving agent {agent.id}: {ie}") return False except Exception as e: logger.error(f"โŒ Failed to save agent {agent.id}: {e}") return False async def load_agent(self, agent_id: str) -> Optional[SaapAgent]: """Load agent from database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: result = await session.execute( select(DBAgent).where(DBAgent.id == agent_id) ) db_agent = result.scalar_one_or_none() if db_agent: agent = db_agent.to_saap_agent() logger.debug(f"๐Ÿ“– Loaded agent: {agent.name}") return agent else: logger.debug(f"โ“ Agent not found in database: {agent_id}") return None except Exception as e: logger.error(f"โŒ Failed to load agent {agent_id}: {e}") return None async def load_all_agents(self) -> List[SaapAgent]: """Load all agents from database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent)) db_agents = result.scalars().all() agents = [] for db_agent in db_agents: try: agent = db_agent.to_saap_agent() agents.append(agent) except Exception as conv_error: logger.error(f"โš ๏ธ Failed to convert agent {db_agent.id}: {conv_error}") continue logger.info(f"๐Ÿ“š Loaded {len(agents)} agents from database") return agents except Exception as e: logger.error(f"โŒ Failed to load agents from database: {e}") return [] async def delete_agent(self, agent_id: str) -> bool: """Delete agent from database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: # Delete agent (cascading will handle related records) result = await session.execute( delete(DBAgent).where(DBAgent.id == agent_id) ) await session.commit() if result.rowcount > 0: logger.info(f"๐Ÿ—‘๏ธ Agent deleted from database: {agent_id}") return True else: logger.warning(f"โ“ Agent not found for deletion: {agent_id}") return False except Exception as e: logger.error(f"โŒ Failed to delete agent {agent_id}: {e}") return False async def update_agent_status(self, agent_id: str, status: AgentStatus) -> bool: """Update agent status in database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: result = await session.execute( update(DBAgent) .where(DBAgent.id == agent_id) .values( status=status.value, last_active=datetime.utcnow(), updated_at=datetime.utcnow() ) ) await session.commit() if result.rowcount > 0: logger.debug(f"๐Ÿ“Š Agent status updated: {agent_id} -> {status.value}") return True else: logger.warning(f"โ“ Agent not found for status update: {agent_id}") return False except Exception as e: logger.error(f"โŒ Failed to update agent status {agent_id}: {e}") return False async def save_chat_message(self, agent_id: str, user_message: str, agent_response: str, response_time: float, tokens_used: int = 0, metadata: Dict = None) -> bool: """Save chat message to database""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: chat_message = DBChatMessage( agent_id=agent_id, user_message=user_message, agent_response=agent_response, response_time=response_time, tokens_used=tokens_used, message_metadata=metadata or {} ) session.add(chat_message) await session.commit() logger.debug(f"๐Ÿ’ฌ Chat message saved for agent: {agent_id}") return True except Exception as e: logger.error(f"โŒ Failed to save chat message for {agent_id}: {e}") return False async def get_agent_chat_history(self, agent_id: str, limit: int = 50) -> List[Dict]: """Get chat history for an agent""" try: if not self.is_ready: await self.initialize() async with db_manager.get_async_session() as session: result = await session.execute( select(DBChatMessage) .where(DBChatMessage.agent_id == agent_id) .order_by(DBChatMessage.created_at.desc()) .limit(limit) ) messages = result.scalars().all() return [ { "user_message": msg.user_message, "agent_response": msg.agent_response, "response_time": msg.response_time, "tokens_used": msg.tokens_used, "created_at": msg.created_at.isoformat(), "metadata": msg.message_metadata or {} } for msg in messages ] except Exception as e: logger.error(f"โŒ Failed to get chat history for {agent_id}: {e}") return [] async def health_check(self) -> Dict[str, Any]: """Check database service health""" try: if not db_manager.is_initialized: return {"status": "not_initialized"} # Try to count agents async with db_manager.get_async_session() as session: result = await session.execute(select(DBAgent)) agent_count = len(result.scalars().all()) return { "status": "healthy", "service_ready": self.is_ready, "agent_count": agent_count, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": "error", "error": str(e), "service_ready": self.is_ready, "timestamp": datetime.utcnow().isoformat() } # Global database service instance database_service = DatabaseService() # Convenience functions async def ensure_database_ready(): """Ensure database service is ready""" if not database_service.is_ready: await database_service.initialize() if __name__ == "__main__": async def test_database_service(): """Test database service functionality""" print("๐Ÿงช Testing Database Service...") service = DatabaseService() await service.initialize() # Test health check health = await service.health_check() print(f"Health: {health}") # Test loading agents agents = await service.load_all_agents() print(f"Loaded agents: {[a.name for a in agents]}") asyncio.run(test_database_service())