Spaces:
Sleeping
Sleeping
| """ | |
| SAAP Database Service - High-level Database Operations | |
| Simplified database operations for Agent Manager with proper initialization | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, update, delete | |
| from sqlalchemy.exc import IntegrityError, SQLAlchemyError | |
| from database.connection import db_manager | |
| from database.models import DBAgent, DBChatMessage, DBAgentSession | |
| from models.agent import SaapAgent, AgentStatus | |
| logger = logging.getLogger(__name__) | |
| class DatabaseService: | |
| """ | |
| High-level database service for SAAP Agent operations | |
| Handles all database operations with proper error handling | |
| """ | |
| def __init__(self): | |
| self.is_ready = False | |
| async def initialize(self): | |
| """Initialize database service and ensure tables exist""" | |
| try: | |
| logger.info("ποΈ Initializing SAAP Database Service...") | |
| # Initialize database manager if not already done | |
| if not db_manager.is_initialized: | |
| await db_manager.initialize() | |
| # Force database tables creation | |
| await self._ensure_tables_exist() | |
| self.is_ready = True | |
| logger.info("β Database Service initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Database Service initialization failed: {e}") | |
| raise | |
| async def _ensure_tables_exist(self): | |
| """Ensure all required database tables exist""" | |
| try: | |
| # Force table creation by running a simple query | |
| async with db_manager.get_async_session() as session: | |
| # Test if agents table exists by trying to count rows | |
| try: | |
| result = await session.execute(select(DBAgent)) | |
| agents = result.scalars().all() | |
| logger.info(f"π Found {len(agents)} existing agents in database") | |
| except Exception as table_error: | |
| logger.warning(f"β οΈ Agents table not ready: {table_error}") | |
| # Tables should already be created by db_manager.initialize() | |
| except Exception as e: | |
| logger.error(f"β Failed to verify tables: {e}") | |
| raise | |
| async def save_agent(self, agent: SaapAgent) -> bool: | |
| """Save or update agent in database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| # Check if agent exists | |
| existing = await session.execute( | |
| select(DBAgent).where(DBAgent.id == agent.id) | |
| ) | |
| db_agent = existing.scalar_one_or_none() | |
| if db_agent: | |
| # Update existing agent | |
| for key, value in DBAgent.from_saap_agent(agent).__dict__.items(): | |
| if not key.startswith('_'): # Skip SQLAlchemy internal attributes | |
| setattr(db_agent, key, value) | |
| db_agent.updated_at = datetime.utcnow() | |
| logger.info(f"π Updating existing agent: {agent.name}") | |
| else: | |
| # Create new agent | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| logger.info(f"β Creating new agent: {agent.name}") | |
| await session.commit() | |
| logger.info(f"β Agent saved successfully: {agent.name} ({agent.id})") | |
| return True | |
| except IntegrityError as ie: | |
| logger.error(f"β Database integrity error saving agent {agent.id}: {ie}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Failed to save agent {agent.id}: {e}") | |
| return False | |
| async def load_agent(self, agent_id: str) -> Optional[SaapAgent]: | |
| """Load agent from database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute( | |
| select(DBAgent).where(DBAgent.id == agent_id) | |
| ) | |
| db_agent = result.scalar_one_or_none() | |
| if db_agent: | |
| agent = db_agent.to_saap_agent() | |
| logger.debug(f"π Loaded agent: {agent.name}") | |
| return agent | |
| else: | |
| logger.debug(f"β Agent not found in database: {agent_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Failed to load agent {agent_id}: {e}") | |
| return None | |
| async def load_all_agents(self) -> List[SaapAgent]: | |
| """Load all agents from database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| db_agents = result.scalars().all() | |
| agents = [] | |
| for db_agent in db_agents: | |
| try: | |
| agent = db_agent.to_saap_agent() | |
| agents.append(agent) | |
| except Exception as conv_error: | |
| logger.error(f"β οΈ Failed to convert agent {db_agent.id}: {conv_error}") | |
| continue | |
| logger.info(f"π Loaded {len(agents)} agents from database") | |
| return agents | |
| except Exception as e: | |
| logger.error(f"β Failed to load agents from database: {e}") | |
| return [] | |
| async def delete_agent(self, agent_id: str) -> bool: | |
| """Delete agent from database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| # Delete agent (cascading will handle related records) | |
| result = await session.execute( | |
| delete(DBAgent).where(DBAgent.id == agent_id) | |
| ) | |
| await session.commit() | |
| if result.rowcount > 0: | |
| logger.info(f"ποΈ Agent deleted from database: {agent_id}") | |
| return True | |
| else: | |
| logger.warning(f"β Agent not found for deletion: {agent_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Failed to delete agent {agent_id}: {e}") | |
| return False | |
| async def update_agent_status(self, agent_id: str, status: AgentStatus) -> bool: | |
| """Update agent status in database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute( | |
| update(DBAgent) | |
| .where(DBAgent.id == agent_id) | |
| .values( | |
| status=status.value, | |
| last_active=datetime.utcnow(), | |
| updated_at=datetime.utcnow() | |
| ) | |
| ) | |
| await session.commit() | |
| if result.rowcount > 0: | |
| logger.debug(f"π Agent status updated: {agent_id} -> {status.value}") | |
| return True | |
| else: | |
| logger.warning(f"β Agent not found for status update: {agent_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Failed to update agent status {agent_id}: {e}") | |
| return False | |
| async def save_chat_message(self, agent_id: str, user_message: str, | |
| agent_response: str, response_time: float, | |
| tokens_used: int = 0, metadata: Dict = None) -> bool: | |
| """Save chat message to database""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=user_message, | |
| agent_response=agent_response, | |
| response_time=response_time, | |
| tokens_used=tokens_used, | |
| message_metadata=metadata or {} | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| logger.debug(f"π¬ Chat message saved for agent: {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Failed to save chat message for {agent_id}: {e}") | |
| return False | |
| async def get_agent_chat_history(self, agent_id: str, limit: int = 50) -> List[Dict]: | |
| """Get chat history for an agent""" | |
| try: | |
| if not self.is_ready: | |
| await self.initialize() | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute( | |
| select(DBChatMessage) | |
| .where(DBChatMessage.agent_id == agent_id) | |
| .order_by(DBChatMessage.created_at.desc()) | |
| .limit(limit) | |
| ) | |
| messages = result.scalars().all() | |
| return [ | |
| { | |
| "user_message": msg.user_message, | |
| "agent_response": msg.agent_response, | |
| "response_time": msg.response_time, | |
| "tokens_used": msg.tokens_used, | |
| "created_at": msg.created_at.isoformat(), | |
| "metadata": msg.message_metadata or {} | |
| } | |
| for msg in messages | |
| ] | |
| except Exception as e: | |
| logger.error(f"β Failed to get chat history for {agent_id}: {e}") | |
| return [] | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Check database service health""" | |
| try: | |
| if not db_manager.is_initialized: | |
| return {"status": "not_initialized"} | |
| # Try to count agents | |
| async with db_manager.get_async_session() as session: | |
| result = await session.execute(select(DBAgent)) | |
| agent_count = len(result.scalars().all()) | |
| return { | |
| "status": "healthy", | |
| "service_ready": self.is_ready, | |
| "agent_count": agent_count, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "service_ready": self.is_ready, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Global database service instance | |
| database_service = DatabaseService() | |
| # Convenience functions | |
| async def ensure_database_ready(): | |
| """Ensure database service is ready""" | |
| if not database_service.is_ready: | |
| await database_service.initialize() | |
| if __name__ == "__main__": | |
| async def test_database_service(): | |
| """Test database service functionality""" | |
| print("π§ͺ Testing Database Service...") | |
| service = DatabaseService() | |
| await service.initialize() | |
| # Test health check | |
| health = await service.health_check() | |
| print(f"Health: {health}") | |
| # Test loading agents | |
| agents = await service.load_all_agents() | |
| print(f"Loaded agents: {[a.name for a in agents]}") | |
| asyncio.run(test_database_service()) |