#!/usr/bin/env python3 """ SAAP colossus Server Integration - ColosusSAAPAgent ================================================= Direct integration with colossus Server für Phase 1 Infrastructure Foundation. Hybrid Architecture: CachyOS (Orchestrierung) + colossus (LLM Processing) Server Details: - URL: https://ai.adrian-schupp.de - Model: mistral-small3.2:24b-instruct-2506 - Performance Target: < 2s Response-Zeit Integration with existing SAAP Agent Communication System. """ import asyncio import json import time import logging import os from typing import Dict, Any, Optional, List from dataclasses import dataclass, field import aiohttp import redis.asyncio as redis from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ColossusConfig: """colossus Server Configuration""" base_url: str = "https://ai.adrian-schupp.de" api_key: str = field(default_factory=lambda: os.getenv("COLOSSUS_API_KEY", "")) model: str = "mistral-small3.2:24b-instruct-2506" max_tokens: int = 1000 temperature: float = 0.7 timeout: int = 30 # seconds def __post_init__(self): """Validate configuration after initialization""" if not self.api_key: raise ValueError( "❌ COLOSSUS_API_KEY environment variable not set.\n" "Please set it in your .env file:\n" "COLOSSUS_API_KEY=your-api-key-here" ) class ColosusSAAPAgent: """ SAAP Agent mit colossus Server Integration Hybrid Architecture: - CachyOS: Agent Orchestrierung, Message Queue, System Management - colossus: High-Performance LLM Processing, AI Inference """ def __init__(self, agent_name: str, agent_role: str = "Coordinator", config: Optional[ColossusConfig] = None, redis_url: str = "redis://localhost:6379"): self.agent_name = agent_name self.agent_role = agent_role self.config = config or ColossusConfig() self.redis_url = redis_url # Agent context for specialized roles self.agent_contexts = { "Coordinator": "Du bist Agent A (Coordinator) für SAAP. Du koordinierst Multi-Agent Workflows und delegierst Tasks effizient.", "Developer": "Du bist Agent B (Developer) mit Expertise in Python, Node.js, Vue.js. Du fokussierst auf Clean Code und Performance.", "Analyst": "Du bist Agent C (Analyst) für Requirements-Analyse, Use Cases und Systemdesign. Du lieferst strukturierte Analysen.", "General": "Du bist ein SAAP Multi-Agent mit genereller KI-Expertise für vielseitige Tasks." } # Performance tracking self.performance_stats = { "total_requests": 0, "total_response_time": 0.0, "average_response_time": 0.0, "errors": 0, "successful_requests": 0 } # Redis connection (will be initialized async) self.redis_client = None async def initialize(self): """Initialize async components""" try: self.redis_client = await redis.from_url(self.redis_url) await self.redis_client.ping() logger.info(f"✅ {self.agent_name} connected to Redis") # Register agent in Redis await self.register_agent() except Exception as e: logger.error(f"❌ Redis connection failed for {self.agent_name}: {e}") # Continue without Redis - degraded mode async def register_agent(self): """Register agent with SAAP system""" if not self.redis_client: return agent_info = { "name": self.agent_name, "role": self.agent_role, "status": "active", "model": self.config.model, "server": "colossus", "capabilities": ["llm_processing", "multi_agent_communication", "task_coordination"], "performance_target": "< 2s response time", "timestamp": time.time() } await self.redis_client.hset( f"agent:{self.agent_name}", mapping={k: json.dumps(v) if isinstance(v, (dict, list)) else str(v) for k, v in agent_info.items()} ) # Add to active agents set await self.redis_client.sadd("active_agents", self.agent_name) logger.info(f"📝 {self.agent_name} registered with SAAP system") async def call_colossus_api(self, prompt: str) -> Dict[str, Any]: """ Direct API call to colossus Server Returns response with performance metrics """ start_time = time.time() try: headers = { "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" } # Add agent context to prompt context = self.agent_contexts.get(self.agent_role, self.agent_contexts["General"]) enhanced_prompt = f"{context}\\n\\nAufgabe: {prompt}" payload = { "model": self.config.model, "messages": [ {"role": "user", "content": enhanced_prompt} ], "max_tokens": self.config.max_tokens, "temperature": self.config.temperature } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.config.timeout)) as session: async with session.post( f"{self.config.base_url}/v1/chat/completions", headers=headers, json=payload ) as response: if response.status == 200: result = await response.json() # Extract response text content = result.get("choices", [{}])[0].get("message", {}).get("content", "No response") # Calculate performance response_time = time.time() - start_time # Update stats self.performance_stats["total_requests"] += 1 self.performance_stats["successful_requests"] += 1 self.performance_stats["total_response_time"] += response_time self.performance_stats["average_response_time"] = ( self.performance_stats["total_response_time"] / self.performance_stats["total_requests"] ) return { "success": True, "content": content, "response_time": response_time, "model": self.config.model, "server": "colossus", "agent": self.agent_name, "role": self.agent_role, "performance_check": response_time < 2.0 # Success if < 2s } else: error_text = await response.text() raise Exception(f"API Error {response.status}: {error_text}") except Exception as e: # Update error stats self.performance_stats["errors"] += 1 self.performance_stats["total_requests"] += 1 response_time = time.time() - start_time logger.error(f"❌ colossus API call failed: {e}") return { "success": False, "error": str(e), "response_time": response_time, "agent": self.agent_name, "server": "colossus", "performance_check": False } async def process_message(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]: """ Process incoming message with colossus LLM Integrates with SAAP Message Queue System """ # Log message processing logger.info(f"🤖 {self.agent_name} ({self.agent_role}) processing message...") # Call colossus API result = await self.call_colossus_api(message) # Add SAAP-specific metadata result.update({ "agent_name": self.agent_name, "agent_role": self.agent_role, "timestamp": time.time(), "message_id": context.get("message_id") if context else None, "thread_id": context.get("thread_id") if context else None }) # Store in Redis if available if self.redis_client and result["success"]: await self._store_message_result(message, result) # Performance logging performance_emoji = "⚡" if result.get("performance_check", False) else "⏱️" logger.info(f"{performance_emoji} {self.agent_name}: {result.get('response_time', 0):.2f}s") return result async def _store_message_result(self, message: str, result: Dict[str, Any]): """Store message and result in Redis for monitoring""" if not self.redis_client: return message_data = { "input": message, "output": result.get("content", ""), "agent": self.agent_name, "role": self.agent_role, "response_time": result.get("response_time", 0), "timestamp": result.get("timestamp", time.time()), "success": result.get("success", False) } # Store in message history await self.redis_client.lpush( f"messages:{self.agent_name}", json.dumps(message_data) ) # Keep only recent messages (last 100) await self.redis_client.ltrim(f"messages:{self.agent_name}", 0, 99) # Update agent status await self.redis_client.hset( f"agent:{self.agent_name}", "last_activity", str(time.time()) ) async def get_performance_stats(self) -> Dict[str, Any]: """Get comprehensive performance statistics""" stats = self.performance_stats.copy() # Add colossus-specific metrics stats.update({ "server": "colossus", "model": self.config.model, "performance_target_met": stats["average_response_time"] < 2.0, "success_rate": ( (stats["successful_requests"] / stats["total_requests"]) * 100 if stats["total_requests"] > 0 else 0 ), "agent_name": self.agent_name, "agent_role": self.agent_role }) return stats async def cleanup(self): """Cleanup connections""" if self.redis_client: await self.redis_client.srem("active_agents", self.agent_name) await self.redis_client.close() # Example Usage & Testing async def test_colossus_integration(): """Test colossus Server Integration""" print("🚀 Testing SAAP colossus Server Integration...") # Create test agents agents = [ ColosusSAAPAgent("agent_coordinator", "Coordinator"), ColosusSAAPAgent("agent_developer", "Developer"), ColosusSAAPAgent("agent_analyst", "Analyst") ] # Initialize all agents for agent in agents: await agent.initialize() # Test messages test_messages = [ "Analysiere die SAAP Multi-Agent-Architektur und identifiziere Optimierungsbedarfe.", "Entwickle Python Code für Redis Message Queue Integration.", "Erstelle Use Cases für Agent-zu-Agent Kommunikation." ] # Process messages in parallel tasks = [] for i, agent in enumerate(agents): message = test_messages[i % len(test_messages)] tasks.append(agent.process_message(message, {"test_id": i})) results = await asyncio.gather(*tasks, return_exceptions=True) # Print results print("\\n" + "="*60) print("🎯 SAAP colossus Integration Results:") print("="*60) for i, result in enumerate(results): if isinstance(result, Exception): print(f"❌ Agent {i+1}: Error - {result}") else: agent_name = result.get("agent_name", f"Agent_{i+1}") response_time = result.get("response_time", 0) success = result.get("success", False) performance = "✅ < 2s" if result.get("performance_check", False) else f"⏱️ {response_time:.2f}s" print(f"{'✅' if success else '❌'} {agent_name}: {performance}") if success: content = result.get("content", "")[:100] + "..." if len(result.get("content", "")) > 100 else result.get("content", "") print(f" Response: {content}") # Performance summary print("\\n" + "="*60) print("📊 Performance Summary:") for agent in agents: stats = await agent.get_performance_stats() print(f"🤖 {agent.agent_name} ({agent.agent_role}):") print(f" Average Response Time: {stats['average_response_time']:.2f}s") print(f" Success Rate: {stats['success_rate']:.1f}%") print(f" Performance Target Met: {'✅' if stats['performance_target_met'] else '❌'}") # Cleanup for agent in agents: await agent.cleanup() print("\\n🎉 colossus Integration Test Complete!") if __name__ == "__main__": asyncio.run(test_colossus_integration())