Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
"""
SAAP Database Service - High-level Database Operations
Simplified database operations for Agent Manager with proper initialization
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from database.connection import db_manager
from database.models import DBAgent, DBChatMessage, DBAgentSession
from models.agent import SaapAgent, AgentStatus
logger = logging.getLogger(__name__)
class DatabaseService:
"""
High-level database service for SAAP Agent operations
Handles all database operations with proper error handling
"""
def __init__(self):
self.is_ready = False
async def initialize(self):
"""Initialize database service and ensure tables exist"""
try:
logger.info("πŸ—„οΈ Initializing SAAP Database Service...")
# Initialize database manager if not already done
if not db_manager.is_initialized:
await db_manager.initialize()
# Force database tables creation
await self._ensure_tables_exist()
self.is_ready = True
logger.info("βœ… Database Service initialized successfully")
except Exception as e:
logger.error(f"❌ Database Service initialization failed: {e}")
raise
async def _ensure_tables_exist(self):
"""Ensure all required database tables exist"""
try:
# Force table creation by running a simple query
async with db_manager.get_async_session() as session:
# Test if agents table exists by trying to count rows
try:
result = await session.execute(select(DBAgent))
agents = result.scalars().all()
logger.info(f"πŸ“š Found {len(agents)} existing agents in database")
except Exception as table_error:
logger.warning(f"⚠️ Agents table not ready: {table_error}")
# Tables should already be created by db_manager.initialize()
except Exception as e:
logger.error(f"❌ Failed to verify tables: {e}")
raise
async def save_agent(self, agent: SaapAgent) -> bool:
"""Save or update agent in database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
# Check if agent exists
existing = await session.execute(
select(DBAgent).where(DBAgent.id == agent.id)
)
db_agent = existing.scalar_one_or_none()
if db_agent:
# Update existing agent
for key, value in DBAgent.from_saap_agent(agent).__dict__.items():
if not key.startswith('_'): # Skip SQLAlchemy internal attributes
setattr(db_agent, key, value)
db_agent.updated_at = datetime.utcnow()
logger.info(f"πŸ”„ Updating existing agent: {agent.name}")
else:
# Create new agent
db_agent = DBAgent.from_saap_agent(agent)
session.add(db_agent)
logger.info(f"βž• Creating new agent: {agent.name}")
await session.commit()
logger.info(f"βœ… Agent saved successfully: {agent.name} ({agent.id})")
return True
except IntegrityError as ie:
logger.error(f"❌ Database integrity error saving agent {agent.id}: {ie}")
return False
except Exception as e:
logger.error(f"❌ Failed to save agent {agent.id}: {e}")
return False
async def load_agent(self, agent_id: str) -> Optional[SaapAgent]:
"""Load agent from database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
result = await session.execute(
select(DBAgent).where(DBAgent.id == agent_id)
)
db_agent = result.scalar_one_or_none()
if db_agent:
agent = db_agent.to_saap_agent()
logger.debug(f"πŸ“– Loaded agent: {agent.name}")
return agent
else:
logger.debug(f"❓ Agent not found in database: {agent_id}")
return None
except Exception as e:
logger.error(f"❌ Failed to load agent {agent_id}: {e}")
return None
async def load_all_agents(self) -> List[SaapAgent]:
"""Load all agents from database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
db_agents = result.scalars().all()
agents = []
for db_agent in db_agents:
try:
agent = db_agent.to_saap_agent()
agents.append(agent)
except Exception as conv_error:
logger.error(f"⚠️ Failed to convert agent {db_agent.id}: {conv_error}")
continue
logger.info(f"πŸ“š Loaded {len(agents)} agents from database")
return agents
except Exception as e:
logger.error(f"❌ Failed to load agents from database: {e}")
return []
async def delete_agent(self, agent_id: str) -> bool:
"""Delete agent from database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
# Delete agent (cascading will handle related records)
result = await session.execute(
delete(DBAgent).where(DBAgent.id == agent_id)
)
await session.commit()
if result.rowcount > 0:
logger.info(f"πŸ—‘οΈ Agent deleted from database: {agent_id}")
return True
else:
logger.warning(f"❓ Agent not found for deletion: {agent_id}")
return False
except Exception as e:
logger.error(f"❌ Failed to delete agent {agent_id}: {e}")
return False
async def update_agent_status(self, agent_id: str, status: AgentStatus) -> bool:
"""Update agent status in database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
result = await session.execute(
update(DBAgent)
.where(DBAgent.id == agent_id)
.values(
status=status.value,
last_active=datetime.utcnow(),
updated_at=datetime.utcnow()
)
)
await session.commit()
if result.rowcount > 0:
logger.debug(f"πŸ“Š Agent status updated: {agent_id} -> {status.value}")
return True
else:
logger.warning(f"❓ Agent not found for status update: {agent_id}")
return False
except Exception as e:
logger.error(f"❌ Failed to update agent status {agent_id}: {e}")
return False
async def save_chat_message(self, agent_id: str, user_message: str,
agent_response: str, response_time: float,
tokens_used: int = 0, metadata: Dict = None) -> bool:
"""Save chat message to database"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
chat_message = DBChatMessage(
agent_id=agent_id,
user_message=user_message,
agent_response=agent_response,
response_time=response_time,
tokens_used=tokens_used,
message_metadata=metadata or {}
)
session.add(chat_message)
await session.commit()
logger.debug(f"πŸ’¬ Chat message saved for agent: {agent_id}")
return True
except Exception as e:
logger.error(f"❌ Failed to save chat message for {agent_id}: {e}")
return False
async def get_agent_chat_history(self, agent_id: str, limit: int = 50) -> List[Dict]:
"""Get chat history for an agent"""
try:
if not self.is_ready:
await self.initialize()
async with db_manager.get_async_session() as session:
result = await session.execute(
select(DBChatMessage)
.where(DBChatMessage.agent_id == agent_id)
.order_by(DBChatMessage.created_at.desc())
.limit(limit)
)
messages = result.scalars().all()
return [
{
"user_message": msg.user_message,
"agent_response": msg.agent_response,
"response_time": msg.response_time,
"tokens_used": msg.tokens_used,
"created_at": msg.created_at.isoformat(),
"metadata": msg.message_metadata or {}
}
for msg in messages
]
except Exception as e:
logger.error(f"❌ Failed to get chat history for {agent_id}: {e}")
return []
async def health_check(self) -> Dict[str, Any]:
"""Check database service health"""
try:
if not db_manager.is_initialized:
return {"status": "not_initialized"}
# Try to count agents
async with db_manager.get_async_session() as session:
result = await session.execute(select(DBAgent))
agent_count = len(result.scalars().all())
return {
"status": "healthy",
"service_ready": self.is_ready,
"agent_count": agent_count,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"service_ready": self.is_ready,
"timestamp": datetime.utcnow().isoformat()
}
# Global database service instance
database_service = DatabaseService()
# Convenience functions
async def ensure_database_ready():
"""Ensure database service is ready"""
if not database_service.is_ready:
await database_service.initialize()
if __name__ == "__main__":
async def test_database_service():
"""Test database service functionality"""
print("πŸ§ͺ Testing Database Service...")
service = DatabaseService()
await service.initialize()
# Test health check
health = await service.health_check()
print(f"Health: {health}")
# Test loading agents
agents = await service.load_all_agents()
print(f"Loaded agents: {[a.name for a in agents]}")
asyncio.run(test_database_service())