Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| SAAP colossus Server Integration - ColosusSAAPAgent | |
| ================================================= | |
| Direct integration with colossus Server für Phase 1 Infrastructure Foundation. | |
| Hybrid Architecture: CachyOS (Orchestrierung) + colossus (LLM Processing) | |
| Server Details: | |
| - URL: https://ai.adrian-schupp.de | |
| - Model: mistral-small3.2:24b-instruct-2506 | |
| - Performance Target: < 2s Response-Zeit | |
| Integration with existing SAAP Agent Communication System. | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| import logging | |
| import os | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass, field | |
| import aiohttp | |
| import redis.asyncio as redis | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ColossusConfig: | |
| """colossus Server Configuration""" | |
| base_url: str = "https://ai.adrian-schupp.de" | |
| api_key: str = field(default_factory=lambda: os.getenv("COLOSSUS_API_KEY", "")) | |
| model: str = "mistral-small3.2:24b-instruct-2506" | |
| max_tokens: int = 1000 | |
| temperature: float = 0.7 | |
| timeout: int = 30 # seconds | |
| def __post_init__(self): | |
| """Validate configuration after initialization""" | |
| if not self.api_key: | |
| raise ValueError( | |
| "❌ COLOSSUS_API_KEY environment variable not set.\n" | |
| "Please set it in your .env file:\n" | |
| "COLOSSUS_API_KEY=your-api-key-here" | |
| ) | |
| class ColosusSAAPAgent: | |
| """ | |
| SAAP Agent mit colossus Server Integration | |
| Hybrid Architecture: | |
| - CachyOS: Agent Orchestrierung, Message Queue, System Management | |
| - colossus: High-Performance LLM Processing, AI Inference | |
| """ | |
| def __init__(self, | |
| agent_name: str, | |
| agent_role: str = "Coordinator", | |
| config: Optional[ColossusConfig] = None, | |
| redis_url: str = "redis://localhost:6379"): | |
| self.agent_name = agent_name | |
| self.agent_role = agent_role | |
| self.config = config or ColossusConfig() | |
| self.redis_url = redis_url | |
| # Agent context for specialized roles | |
| self.agent_contexts = { | |
| "Coordinator": "Du bist Agent A (Coordinator) für SAAP. Du koordinierst Multi-Agent Workflows und delegierst Tasks effizient.", | |
| "Developer": "Du bist Agent B (Developer) mit Expertise in Python, Node.js, Vue.js. Du fokussierst auf Clean Code und Performance.", | |
| "Analyst": "Du bist Agent C (Analyst) für Requirements-Analyse, Use Cases und Systemdesign. Du lieferst strukturierte Analysen.", | |
| "General": "Du bist ein SAAP Multi-Agent mit genereller KI-Expertise für vielseitige Tasks." | |
| } | |
| # Performance tracking | |
| self.performance_stats = { | |
| "total_requests": 0, | |
| "total_response_time": 0.0, | |
| "average_response_time": 0.0, | |
| "errors": 0, | |
| "successful_requests": 0 | |
| } | |
| # Redis connection (will be initialized async) | |
| self.redis_client = None | |
| async def initialize(self): | |
| """Initialize async components""" | |
| try: | |
| self.redis_client = await redis.from_url(self.redis_url) | |
| await self.redis_client.ping() | |
| logger.info(f"✅ {self.agent_name} connected to Redis") | |
| # Register agent in Redis | |
| await self.register_agent() | |
| except Exception as e: | |
| logger.error(f"❌ Redis connection failed for {self.agent_name}: {e}") | |
| # Continue without Redis - degraded mode | |
| async def register_agent(self): | |
| """Register agent with SAAP system""" | |
| if not self.redis_client: | |
| return | |
| agent_info = { | |
| "name": self.agent_name, | |
| "role": self.agent_role, | |
| "status": "active", | |
| "model": self.config.model, | |
| "server": "colossus", | |
| "capabilities": ["llm_processing", "multi_agent_communication", "task_coordination"], | |
| "performance_target": "< 2s response time", | |
| "timestamp": time.time() | |
| } | |
| await self.redis_client.hset( | |
| f"agent:{self.agent_name}", | |
| mapping={k: json.dumps(v) if isinstance(v, (dict, list)) else str(v) | |
| for k, v in agent_info.items()} | |
| ) | |
| # Add to active agents set | |
| await self.redis_client.sadd("active_agents", self.agent_name) | |
| logger.info(f"📝 {self.agent_name} registered with SAAP system") | |
| async def call_colossus_api(self, prompt: str) -> Dict[str, Any]: | |
| """ | |
| Direct API call to colossus Server | |
| Returns response with performance metrics | |
| """ | |
| start_time = time.time() | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {self.config.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # Add agent context to prompt | |
| context = self.agent_contexts.get(self.agent_role, self.agent_contexts["General"]) | |
| enhanced_prompt = f"{context}\\n\\nAufgabe: {prompt}" | |
| payload = { | |
| "model": self.config.model, | |
| "messages": [ | |
| {"role": "user", "content": enhanced_prompt} | |
| ], | |
| "max_tokens": self.config.max_tokens, | |
| "temperature": self.config.temperature | |
| } | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.config.timeout)) as session: | |
| async with session.post( | |
| f"{self.config.base_url}/v1/chat/completions", | |
| headers=headers, | |
| json=payload | |
| ) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| # Extract response text | |
| content = result.get("choices", [{}])[0].get("message", {}).get("content", "No response") | |
| # Calculate performance | |
| response_time = time.time() - start_time | |
| # Update stats | |
| self.performance_stats["total_requests"] += 1 | |
| self.performance_stats["successful_requests"] += 1 | |
| self.performance_stats["total_response_time"] += response_time | |
| self.performance_stats["average_response_time"] = ( | |
| self.performance_stats["total_response_time"] / | |
| self.performance_stats["total_requests"] | |
| ) | |
| return { | |
| "success": True, | |
| "content": content, | |
| "response_time": response_time, | |
| "model": self.config.model, | |
| "server": "colossus", | |
| "agent": self.agent_name, | |
| "role": self.agent_role, | |
| "performance_check": response_time < 2.0 # Success if < 2s | |
| } | |
| else: | |
| error_text = await response.text() | |
| raise Exception(f"API Error {response.status}: {error_text}") | |
| except Exception as e: | |
| # Update error stats | |
| self.performance_stats["errors"] += 1 | |
| self.performance_stats["total_requests"] += 1 | |
| response_time = time.time() - start_time | |
| logger.error(f"❌ colossus API call failed: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "response_time": response_time, | |
| "agent": self.agent_name, | |
| "server": "colossus", | |
| "performance_check": False | |
| } | |
| async def process_message(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Process incoming message with colossus LLM | |
| Integrates with SAAP Message Queue System | |
| """ | |
| # Log message processing | |
| logger.info(f"🤖 {self.agent_name} ({self.agent_role}) processing message...") | |
| # Call colossus API | |
| result = await self.call_colossus_api(message) | |
| # Add SAAP-specific metadata | |
| result.update({ | |
| "agent_name": self.agent_name, | |
| "agent_role": self.agent_role, | |
| "timestamp": time.time(), | |
| "message_id": context.get("message_id") if context else None, | |
| "thread_id": context.get("thread_id") if context else None | |
| }) | |
| # Store in Redis if available | |
| if self.redis_client and result["success"]: | |
| await self._store_message_result(message, result) | |
| # Performance logging | |
| performance_emoji = "⚡" if result.get("performance_check", False) else "⏱️" | |
| logger.info(f"{performance_emoji} {self.agent_name}: {result.get('response_time', 0):.2f}s") | |
| return result | |
| async def _store_message_result(self, message: str, result: Dict[str, Any]): | |
| """Store message and result in Redis for monitoring""" | |
| if not self.redis_client: | |
| return | |
| message_data = { | |
| "input": message, | |
| "output": result.get("content", ""), | |
| "agent": self.agent_name, | |
| "role": self.agent_role, | |
| "response_time": result.get("response_time", 0), | |
| "timestamp": result.get("timestamp", time.time()), | |
| "success": result.get("success", False) | |
| } | |
| # Store in message history | |
| await self.redis_client.lpush( | |
| f"messages:{self.agent_name}", | |
| json.dumps(message_data) | |
| ) | |
| # Keep only recent messages (last 100) | |
| await self.redis_client.ltrim(f"messages:{self.agent_name}", 0, 99) | |
| # Update agent status | |
| await self.redis_client.hset( | |
| f"agent:{self.agent_name}", | |
| "last_activity", | |
| str(time.time()) | |
| ) | |
| async def get_performance_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive performance statistics""" | |
| stats = self.performance_stats.copy() | |
| # Add colossus-specific metrics | |
| stats.update({ | |
| "server": "colossus", | |
| "model": self.config.model, | |
| "performance_target_met": stats["average_response_time"] < 2.0, | |
| "success_rate": ( | |
| (stats["successful_requests"] / stats["total_requests"]) * 100 | |
| if stats["total_requests"] > 0 else 0 | |
| ), | |
| "agent_name": self.agent_name, | |
| "agent_role": self.agent_role | |
| }) | |
| return stats | |
| async def cleanup(self): | |
| """Cleanup connections""" | |
| if self.redis_client: | |
| await self.redis_client.srem("active_agents", self.agent_name) | |
| await self.redis_client.close() | |
| # Example Usage & Testing | |
| async def test_colossus_integration(): | |
| """Test colossus Server Integration""" | |
| print("🚀 Testing SAAP colossus Server Integration...") | |
| # Create test agents | |
| agents = [ | |
| ColosusSAAPAgent("agent_coordinator", "Coordinator"), | |
| ColosusSAAPAgent("agent_developer", "Developer"), | |
| ColosusSAAPAgent("agent_analyst", "Analyst") | |
| ] | |
| # Initialize all agents | |
| for agent in agents: | |
| await agent.initialize() | |
| # Test messages | |
| test_messages = [ | |
| "Analysiere die SAAP Multi-Agent-Architektur und identifiziere Optimierungsbedarfe.", | |
| "Entwickle Python Code für Redis Message Queue Integration.", | |
| "Erstelle Use Cases für Agent-zu-Agent Kommunikation." | |
| ] | |
| # Process messages in parallel | |
| tasks = [] | |
| for i, agent in enumerate(agents): | |
| message = test_messages[i % len(test_messages)] | |
| tasks.append(agent.process_message(message, {"test_id": i})) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Print results | |
| print("\\n" + "="*60) | |
| print("🎯 SAAP colossus Integration Results:") | |
| print("="*60) | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| print(f"❌ Agent {i+1}: Error - {result}") | |
| else: | |
| agent_name = result.get("agent_name", f"Agent_{i+1}") | |
| response_time = result.get("response_time", 0) | |
| success = result.get("success", False) | |
| performance = "✅ < 2s" if result.get("performance_check", False) else f"⏱️ {response_time:.2f}s" | |
| print(f"{'✅' if success else '❌'} {agent_name}: {performance}") | |
| if success: | |
| content = result.get("content", "")[:100] + "..." if len(result.get("content", "")) > 100 else result.get("content", "") | |
| print(f" Response: {content}") | |
| # Performance summary | |
| print("\\n" + "="*60) | |
| print("📊 Performance Summary:") | |
| for agent in agents: | |
| stats = await agent.get_performance_stats() | |
| print(f"🤖 {agent.agent_name} ({agent.agent_role}):") | |
| print(f" Average Response Time: {stats['average_response_time']:.2f}s") | |
| print(f" Success Rate: {stats['success_rate']:.1f}%") | |
| print(f" Performance Target Met: {'✅' if stats['performance_target_met'] else '❌'}") | |
| # Cleanup | |
| for agent in agents: | |
| await agent.cleanup() | |
| print("\\n🎉 colossus Integration Test Complete!") | |
| if __name__ == "__main__": | |
| asyncio.run(test_colossus_integration()) | |