File size: 12,556 Bytes
4343907
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
SAAP Database Service - High-level Database Operations
Simplified database operations for Agent Manager with proper initialization
"""

import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.exc import IntegrityError, SQLAlchemyError

from database.connection import db_manager
from database.models import DBAgent, DBChatMessage, DBAgentSession
from models.agent import SaapAgent, AgentStatus

logger = logging.getLogger(__name__)

class DatabaseService:
    """
    High-level database service for SAAP Agent operations
    Handles all database operations with proper error handling
    """
    
    def __init__(self):
        self.is_ready = False
    
    async def initialize(self):
        """Initialize database service and ensure tables exist"""
        try:
            logger.info("πŸ—„οΈ Initializing SAAP Database Service...")
            
            # Initialize database manager if not already done
            if not db_manager.is_initialized:
                await db_manager.initialize()
            
            # Force database tables creation
            await self._ensure_tables_exist()
            
            self.is_ready = True
            logger.info("βœ… Database Service initialized successfully")
            
        except Exception as e:
            logger.error(f"❌ Database Service initialization failed: {e}")
            raise
    
    async def _ensure_tables_exist(self):
        """Ensure all required database tables exist"""
        try:
            # Force table creation by running a simple query
            async with db_manager.get_async_session() as session:
                # Test if agents table exists by trying to count rows
                try:
                    result = await session.execute(select(DBAgent))
                    agents = result.scalars().all()
                    logger.info(f"πŸ“š Found {len(agents)} existing agents in database")
                except Exception as table_error:
                    logger.warning(f"⚠️ Agents table not ready: {table_error}")
                    # Tables should already be created by db_manager.initialize()
            
        except Exception as e:
            logger.error(f"❌ Failed to verify tables: {e}")
            raise
    
    async def save_agent(self, agent: SaapAgent) -> bool:
        """Save or update agent in database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                # Check if agent exists
                existing = await session.execute(
                    select(DBAgent).where(DBAgent.id == agent.id)
                )
                db_agent = existing.scalar_one_or_none()
                
                if db_agent:
                    # Update existing agent
                    for key, value in DBAgent.from_saap_agent(agent).__dict__.items():
                        if not key.startswith('_'):  # Skip SQLAlchemy internal attributes
                            setattr(db_agent, key, value)
                    
                    db_agent.updated_at = datetime.utcnow()
                    logger.info(f"πŸ”„ Updating existing agent: {agent.name}")
                else:
                    # Create new agent
                    db_agent = DBAgent.from_saap_agent(agent)
                    session.add(db_agent)
                    logger.info(f"βž• Creating new agent: {agent.name}")
                
                await session.commit()
                logger.info(f"βœ… Agent saved successfully: {agent.name} ({agent.id})")
                return True
                
        except IntegrityError as ie:
            logger.error(f"❌ Database integrity error saving agent {agent.id}: {ie}")
            return False
        except Exception as e:
            logger.error(f"❌ Failed to save agent {agent.id}: {e}")
            return False
    
    async def load_agent(self, agent_id: str) -> Optional[SaapAgent]:
        """Load agent from database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                result = await session.execute(
                    select(DBAgent).where(DBAgent.id == agent_id)
                )
                db_agent = result.scalar_one_or_none()
                
                if db_agent:
                    agent = db_agent.to_saap_agent()
                    logger.debug(f"πŸ“– Loaded agent: {agent.name}")
                    return agent
                else:
                    logger.debug(f"❓ Agent not found in database: {agent_id}")
                    return None
                    
        except Exception as e:
            logger.error(f"❌ Failed to load agent {agent_id}: {e}")
            return None
    
    async def load_all_agents(self) -> List[SaapAgent]:
        """Load all agents from database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                result = await session.execute(select(DBAgent))
                db_agents = result.scalars().all()
                
                agents = []
                for db_agent in db_agents:
                    try:
                        agent = db_agent.to_saap_agent()
                        agents.append(agent)
                    except Exception as conv_error:
                        logger.error(f"⚠️ Failed to convert agent {db_agent.id}: {conv_error}")
                        continue
                
                logger.info(f"πŸ“š Loaded {len(agents)} agents from database")
                return agents
                
        except Exception as e:
            logger.error(f"❌ Failed to load agents from database: {e}")
            return []
    
    async def delete_agent(self, agent_id: str) -> bool:
        """Delete agent from database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                # Delete agent (cascading will handle related records)
                result = await session.execute(
                    delete(DBAgent).where(DBAgent.id == agent_id)
                )
                
                await session.commit()
                
                if result.rowcount > 0:
                    logger.info(f"πŸ—‘οΈ Agent deleted from database: {agent_id}")
                    return True
                else:
                    logger.warning(f"❓ Agent not found for deletion: {agent_id}")
                    return False
                    
        except Exception as e:
            logger.error(f"❌ Failed to delete agent {agent_id}: {e}")
            return False
    
    async def update_agent_status(self, agent_id: str, status: AgentStatus) -> bool:
        """Update agent status in database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                result = await session.execute(
                    update(DBAgent)
                    .where(DBAgent.id == agent_id)
                    .values(
                        status=status.value,
                        last_active=datetime.utcnow(),
                        updated_at=datetime.utcnow()
                    )
                )
                
                await session.commit()
                
                if result.rowcount > 0:
                    logger.debug(f"πŸ“Š Agent status updated: {agent_id} -> {status.value}")
                    return True
                else:
                    logger.warning(f"❓ Agent not found for status update: {agent_id}")
                    return False
                    
        except Exception as e:
            logger.error(f"❌ Failed to update agent status {agent_id}: {e}")
            return False
    
    async def save_chat_message(self, agent_id: str, user_message: str, 
                              agent_response: str, response_time: float, 
                              tokens_used: int = 0, metadata: Dict = None) -> bool:
        """Save chat message to database"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                chat_message = DBChatMessage(
                    agent_id=agent_id,
                    user_message=user_message,
                    agent_response=agent_response,
                    response_time=response_time,
                    tokens_used=tokens_used,
                    message_metadata=metadata or {}
                )
                
                session.add(chat_message)
                await session.commit()
                
                logger.debug(f"πŸ’¬ Chat message saved for agent: {agent_id}")
                return True
                
        except Exception as e:
            logger.error(f"❌ Failed to save chat message for {agent_id}: {e}")
            return False
    
    async def get_agent_chat_history(self, agent_id: str, limit: int = 50) -> List[Dict]:
        """Get chat history for an agent"""
        try:
            if not self.is_ready:
                await self.initialize()
            
            async with db_manager.get_async_session() as session:
                result = await session.execute(
                    select(DBChatMessage)
                    .where(DBChatMessage.agent_id == agent_id)
                    .order_by(DBChatMessage.created_at.desc())
                    .limit(limit)
                )
                
                messages = result.scalars().all()
                
                return [
                    {
                        "user_message": msg.user_message,
                        "agent_response": msg.agent_response,
                        "response_time": msg.response_time,
                        "tokens_used": msg.tokens_used,
                        "created_at": msg.created_at.isoformat(),
                        "metadata": msg.message_metadata or {}
                    }
                    for msg in messages
                ]
                
        except Exception as e:
            logger.error(f"❌ Failed to get chat history for {agent_id}: {e}")
            return []
    
    async def health_check(self) -> Dict[str, Any]:
        """Check database service health"""
        try:
            if not db_manager.is_initialized:
                return {"status": "not_initialized"}
            
            # Try to count agents
            async with db_manager.get_async_session() as session:
                result = await session.execute(select(DBAgent))
                agent_count = len(result.scalars().all())
                
                return {
                    "status": "healthy",
                    "service_ready": self.is_ready,
                    "agent_count": agent_count,
                    "timestamp": datetime.utcnow().isoformat()
                }
                
        except Exception as e:
            return {
                "status": "error",
                "error": str(e),
                "service_ready": self.is_ready,
                "timestamp": datetime.utcnow().isoformat()
            }

# Global database service instance
database_service = DatabaseService()

# Convenience functions
async def ensure_database_ready():
    """Ensure database service is ready"""
    if not database_service.is_ready:
        await database_service.initialize()

if __name__ == "__main__":
    async def test_database_service():
        """Test database service functionality"""
        print("πŸ§ͺ Testing Database Service...")
        
        service = DatabaseService()
        await service.initialize()
        
        # Test health check
        health = await service.health_check()
        print(f"Health: {health}")
        
        # Test loading agents
        agents = await service.load_all_agents()
        print(f"Loaded agents: {[a.name for a in agents]}")
    
    asyncio.run(test_database_service())