Spaces:
Sleeping
Sleeping
| """ | |
| SAAP Database Service - Production Ready | |
| High-level database operations for agent management and chat persistence | |
| """ | |
| import logging | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime, timedelta | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import text, select, update, delete, and_, desc | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from database.connection import db_manager | |
| from database.models import DBAgent, DBChatMessage, DBAgentSession, DBSystemLog, DBHealthCheck | |
| from models.agent import SaapAgent | |
| logger = logging.getLogger(__name__) | |
| class DatabaseService: | |
| """ | |
| Database service layer for SAAP production deployment | |
| Provides high-level database operations with error handling and logging | |
| """ | |
| def __init__(self): | |
| self.is_initialized = False | |
| self.db_manager = db_manager | |
| async def initialize(self): | |
| """Initialize database service""" | |
| if not self.db_manager.is_initialized: | |
| await self.db_manager.initialize() | |
| self.is_initialized = True | |
| logger.info("β Database Service initialized") | |
| # ===================================================== | |
| # AGENT MANAGEMENT OPERATIONS | |
| # ===================================================== | |
| async def save_agent(self, agent: SaapAgent) -> bool: | |
| """Save or update agent in database""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| # Check if agent exists | |
| existing_agent = await session.get(DBAgent, agent.id) | |
| if existing_agent: | |
| # Update existing agent | |
| existing_agent.name = agent.name | |
| existing_agent.agent_type = agent.type.value | |
| existing_agent.color = agent.color | |
| existing_agent.avatar = agent.avatar | |
| existing_agent.description = agent.description | |
| existing_agent.llm_config = agent.llm_config.model_dump() | |
| existing_agent.capabilities = agent.capabilities | |
| existing_agent.personality = agent.personality.model_dump() if agent.personality else None | |
| existing_agent.status = agent.status.value | |
| existing_agent.last_active = agent.metrics.last_active if agent.metrics else None | |
| existing_agent.metrics = agent.metrics.model_dump() if agent.metrics else None | |
| existing_agent.updated_at = datetime.utcnow() | |
| existing_agent.tags = agent.tags | |
| logger.info(f"π Updated agent in database: {agent.name}") | |
| else: | |
| # Create new agent | |
| db_agent = DBAgent.from_saap_agent(agent) | |
| session.add(db_agent) | |
| logger.info(f"πΎ Saved new agent to database: {agent.name}") | |
| await session.commit() | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"β Database error saving agent {agent.id}: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Unexpected error saving agent {agent.id}: {e}") | |
| return False | |
| async def load_agent(self, agent_id: str) -> Optional[SaapAgent]: | |
| """Load single agent from database""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| db_agent = await session.get(DBAgent, agent_id) | |
| if db_agent: | |
| return db_agent.to_saap_agent() | |
| else: | |
| logger.warning(f"β οΈ Agent not found in database: {agent_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Error loading agent {agent_id}: {e}") | |
| return None | |
| async def load_all_agents(self) -> List[SaapAgent]: | |
| """Load all agents from database""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| result = await session.execute( | |
| select(DBAgent).order_by(DBAgent.created_at) | |
| ) | |
| db_agents = result.scalars().all() | |
| saap_agents = [] | |
| for db_agent in db_agents: | |
| try: | |
| saap_agent = db_agent.to_saap_agent() | |
| saap_agents.append(saap_agent) | |
| except Exception as e: | |
| logger.error(f"β Error converting DB agent {db_agent.id}: {e}") | |
| logger.info(f"π Loaded {len(saap_agents)} agents from database") | |
| return saap_agents | |
| except Exception as e: | |
| logger.error(f"β Error loading agents: {e}") | |
| return [] | |
| async def delete_agent(self, agent_id: str) -> bool: | |
| """Delete agent and all related data""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| # Delete agent (cascading will delete related chat_messages and sessions) | |
| result = await session.execute( | |
| delete(DBAgent).where(DBAgent.id == agent_id) | |
| ) | |
| if result.rowcount > 0: | |
| await session.commit() | |
| logger.info(f"ποΈ Deleted agent from database: {agent_id}") | |
| return True | |
| else: | |
| logger.warning(f"β οΈ Agent not found for deletion: {agent_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Error deleting agent {agent_id}: {e}") | |
| return False | |
| async def get_agent_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive agent statistics""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| # Total agents by status | |
| status_result = await session.execute(text( | |
| "SELECT status, COUNT(*) as count FROM agents GROUP BY status" | |
| )) | |
| status_breakdown = {row.status: row.count for row in status_result} | |
| # Total agents | |
| total_result = await session.execute(text("SELECT COUNT(*) FROM agents")) | |
| total_agents = total_result.scalar() | |
| # Active agents | |
| active_result = await session.execute(text( | |
| "SELECT COUNT(*) FROM agents WHERE status = 'active'" | |
| )) | |
| active_agents = active_result.scalar() | |
| # Agent types | |
| type_result = await session.execute(text( | |
| "SELECT agent_type, COUNT(*) as count FROM agents GROUP BY agent_type" | |
| )) | |
| type_breakdown = {row.agent_type: row.count for row in type_result} | |
| # Recent activity | |
| recent_result = await session.execute(text( | |
| "SELECT COUNT(*) FROM agents WHERE last_active > :cutoff" | |
| ), {"cutoff": datetime.utcnow() - timedelta(hours=24)}) | |
| recent_active = recent_result.scalar() | |
| return { | |
| "total_agents": total_agents, | |
| "active_agents": active_agents, | |
| "recent_active_24h": recent_active, | |
| "status_breakdown": status_breakdown, | |
| "type_breakdown": type_breakdown, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β Error getting agent statistics: {e}") | |
| return {"error": str(e)} | |
| # ===================================================== | |
| # CHAT MESSAGE OPERATIONS | |
| # ===================================================== | |
| async def save_chat_message(self, agent_id: str, user_message: str, | |
| agent_response: str, response_time: float, | |
| metadata: Optional[Dict] = None) -> bool: | |
| """Save chat message to database""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| chat_message = DBChatMessage( | |
| agent_id=agent_id, | |
| user_message=user_message, | |
| agent_response=agent_response, | |
| response_time=response_time, | |
| metadata=metadata or {} | |
| ) | |
| session.add(chat_message) | |
| await session.commit() | |
| logger.debug(f"π¬ Saved chat message for agent {agent_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Error saving chat message: {e}") | |
| return False | |
| async def get_chat_history(self, agent_id: str, limit: int = 100) -> List[Dict]: | |
| """Get chat history for an agent""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| result = await session.execute( | |
| select(DBChatMessage) | |
| .where(DBChatMessage.agent_id == agent_id) | |
| .order_by(desc(DBChatMessage.created_at)) | |
| .limit(limit) | |
| ) | |
| messages = result.scalars().all() | |
| return [{ | |
| "id": msg.id, | |
| "user_message": msg.user_message, | |
| "agent_response": msg.agent_response, | |
| "response_time": msg.response_time, | |
| "created_at": msg.created_at.isoformat(), | |
| "metadata": msg.metadata | |
| } for msg in messages] | |
| except Exception as e: | |
| logger.error(f"β Error getting chat history: {e}") | |
| return [] | |
| async def get_chat_statistics(self, agent_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Get chat statistics""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| base_query = select(DBChatMessage) | |
| if agent_id: | |
| base_query = base_query.where(DBChatMessage.agent_id == agent_id) | |
| # Total messages | |
| count_result = await session.execute( | |
| select([text("COUNT(*)")]).select_from(base_query.alias()) | |
| ) | |
| total_messages = count_result.scalar() | |
| # Average response time | |
| avg_result = await session.execute( | |
| select([text("AVG(response_time)")]).select_from(base_query.alias()) | |
| ) | |
| avg_response_time = avg_result.scalar() or 0.0 | |
| # Recent activity (24h) | |
| recent_query = base_query.where( | |
| DBChatMessage.created_at > datetime.utcnow() - timedelta(hours=24) | |
| ) | |
| recent_result = await session.execute( | |
| select([text("COUNT(*)")]).select_from(recent_query.alias()) | |
| ) | |
| recent_messages = recent_result.scalar() | |
| return { | |
| "total_messages": total_messages, | |
| "average_response_time": float(avg_response_time), | |
| "recent_messages_24h": recent_messages, | |
| "agent_id": agent_id, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"β Error getting chat statistics: {e}") | |
| return {"error": str(e)} | |
| # ===================================================== | |
| # AGENT SESSION MANAGEMENT | |
| # ===================================================== | |
| async def start_agent_session(self, agent_id: str, metadata: Optional[Dict] = None) -> int: | |
| """Start new agent session""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| agent_session = DBAgentSession( | |
| agent_id=agent_id, | |
| status="active", | |
| metadata=metadata or {} | |
| ) | |
| session.add(agent_session) | |
| await session.commit() | |
| await session.refresh(agent_session) | |
| logger.info(f"π Started session {agent_session.id} for agent {agent_id}") | |
| return agent_session.id | |
| except Exception as e: | |
| logger.error(f"β Error starting agent session: {e}") | |
| return 0 | |
| async def end_agent_session(self, session_id: int, end_reason: str = "graceful") -> bool: | |
| """End agent session""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| agent_session = await session.get(DBAgentSession, session_id) | |
| if agent_session: | |
| agent_session.session_end = datetime.utcnow() | |
| agent_session.status = "completed" | |
| agent_session.end_reason = end_reason | |
| agent_session.calculate_duration() | |
| await session.commit() | |
| logger.info(f"π Ended session {session_id} ({end_reason})") | |
| return True | |
| else: | |
| logger.warning(f"β οΈ Agent session not found: {session_id}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Error ending agent session: {e}") | |
| return False | |
| # ===================================================== | |
| # SYSTEM MONITORING | |
| # ===================================================== | |
| async def log_system_event(self, level: str, logger_name: str, message: str, | |
| agent_id: Optional[str] = None, extra_data: Optional[Dict] = None): | |
| """Log system event to database""" | |
| try: | |
| async with self.db_manager.get_async_session() as session: | |
| system_log = DBSystemLog( | |
| level=level, | |
| logger_name=logger_name, | |
| message=message, | |
| agent_id=agent_id, | |
| extra_data=extra_data or {} | |
| ) | |
| session.add(system_log) | |
| await session.commit() | |
| except Exception as e: | |
| logger.error(f"β Error logging system event: {e}") | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Database health check""" | |
| return await self.db_manager.health_check() | |
| async def cleanup_old_data(self, days: int = 30): | |
| """Clean up old data""" | |
| await self.db_manager.cleanup_old_data(days) | |
| # Global database service instance | |
| db_service = DatabaseService() | |
| if __name__ == "__main__": | |
| import asyncio | |
| async def test_db_service(): | |
| """Test database service operations""" | |
| await db_service.initialize() | |
| # Test health check | |
| health = await db_service.health_check() | |
| print(f"π Database Health: {health}") | |
| # Test agent statistics | |
| stats = await db_service.get_agent_statistics() | |
| print(f"π Agent Stats: {stats}") | |
| # Test chat statistics | |
| chat_stats = await db_service.get_chat_statistics() | |
| print(f"π¬ Chat Stats: {chat_stats}") | |
| asyncio.run(test_db_service()) |