#!/usr/bin/env python3 """ SAAP Colossus Server Integration Agent Integration von colossus Server (https://ai.adrian-schupp.de) in SAAP Multi-Agent System Author: Hanan Wandji Danga & Jane Alesi """ import os from dotenv import load_dotenv import requests import json import time import asyncio import redis from typing import Dict, List, Optional, Any import logging # Load environment variables load_dotenv() class ColossusSAAPAgent: def __init__(self, agent_name: str, role: str, api_key: str, base_url: str = "https://ai.adrian-schupp.de"): """ Initialisierung des Colossus SAAP Agents Args: agent_name: Name des Agents (z.B. "agent_coordinator") role: Rolle des Agents (z.B. "Coordinator", "Developer", "Analyst") api_key: API Key für colossus Server base_url: Base URL des colossus Servers """ self.agent_name = agent_name self.role = role self.api_key = api_key self.base_url = base_url self.model_name = "mistral-small3.2:24b-instruct-2506" # Redis Configuration für SAAP Message Queue self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) self.message_queue = f"saap_agent_{agent_name}" # Agent Context für rollenspezifische Antworten self.context = self._initialize_context() # Logging Setup logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(f"ColossusSAAP.{agent_name}") self.logger.info(f"🚀 {agent_name} ({role}) initiated with colossus Server") def _initialize_context(self) -> str: """Initialisiert rollenspezifischen Kontext für den Agent""" contexts = { "Coordinator": """Du bist der SAAP Agent Coordinator. Deine Aufgabe ist es: - Multi-Agent Workflows zu koordinieren - Aufgaben an spezialisierte Agenten zu delegieren - Systemüberblick zu behalten und Entscheidungen zu treffen - Effizienz und Performance des gesamten SAAP-Systems zu optimieren""", "Developer": """Du bist der SAAP Developer Agent. Deine Aufgabe ist es: - Code zu schreiben und zu überprüfen - Technische Implementierungen zu planen - Architektur-Entscheidungen zu treffen - Code-Qualität und Best Practices sicherzustellen""", "Analyst": """Du bist der SAAP Analyst Agent. Deine Aufgabe ist es: - Daten und Anforderungen zu analysieren - Use Cases zu definieren und dokumentieren - System-Performance zu bewerten - Optimierungspotenziale zu identifizieren""", "default": f"""Du bist ein SAAP Agent mit der Rolle '{self.role}'. Beantworte Anfragen professionell und hilfsbereit basierend auf deiner Spezialisierung.""" } return contexts.get(self.role, contexts["default"]) async def send_request_to_colossus(self, prompt: str, max_tokens: int = 500) -> Dict[str, Any]: """ Sendet Request an colossus Server und misst Performance Args: prompt: Der Eingabe-Prompt max_tokens: Maximale Token-Anzahl für Response Returns: Dict mit Response, Performance-Metriken und Metadaten """ start_time = time.time() # Request Headers headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } # Request Body (OpenAI-kompatible API vermutlich) payload = { "model": self.model_name, "messages": [ {"role": "system", "content": self.context}, {"role": "user", "content": prompt} ], "max_tokens": max_tokens, "temperature": 0.7 } try: # API Call an colossus Server response = requests.post( f"{self.base_url}/v1/chat/completions", # Standard OpenAI API Format headers=headers, json=payload, timeout=30 ) response_time = time.time() - start_time if response.status_code == 200: data = response.json() # Performance Metrics berechnen response_text = data['choices'][0]['message']['content'] token_count = data.get('usage', {}).get('total_tokens', 0) result = { "success": True, "response": response_text, "response_time": round(response_time, 2), "token_count": token_count, "model": self.model_name, "agent_role": self.role, "timestamp": time.time() } self.logger.info(f"✅ colossus Response: {response_time:.2f}s, {token_count} tokens") return result else: error_result = { "success": False, "error": f"HTTP {response.status_code}: {response.text}", "response_time": round(response_time, 2), "timestamp": time.time() } self.logger.error(f"❌ colossus Error: {response.status_code}") return error_result except requests.exceptions.RequestException as e: error_result = { "success": False, "error": f"Request failed: {str(e)}", "response_time": round(time.time() - start_time, 2), "timestamp": time.time() } self.logger.error(f"❌ colossus Connection Error: {e}") return error_result async def process_message(self, message: str, sender: str = "system") -> Dict[str, Any]: """ Verarbeitet eingehende Nachricht und generiert intelligente Antwort """ self.logger.info(f"🔄 {self.agent_name} processing message from {sender}") # Erweiterte Prompt-Konstruktion mit Sender-Kontext enhanced_prompt = f"""[Nachricht von {sender}] {message} Bitte antworte als {self.role} Agent im SAAP System. Sei präzise und hilfreich.""" # colossus Server Request result = await self.send_request_to_colossus(enhanced_prompt) if result["success"]: # Message in Redis Queue für andere Agenten response_data = { "agent_name": self.agent_name, "agent_role": self.role, "original_message": message, "response": result["response"], "sender": sender, "performance": { "response_time": result["response_time"], "token_count": result["token_count"] }, "timestamp": result["timestamp"], "server": "colossus" } # Publish to Redis for other agents and dashboard self.redis_client.lpush(f"saap_responses", json.dumps(response_data)) self.redis_client.publish(f"saap_agent_updates", json.dumps(response_data)) self.logger.info(f"✅ Response generated and published to Redis") return result else: self.logger.error(f"❌ Failed to process message: {result.get('error')}") return result async def listen_for_messages(self): """ Lauscht auf eingehende Nachrichten in der Redis Queue """ self.logger.info(f"👂 {self.agent_name} listening for messages on {self.message_queue}") while True: try: # Pop message from Redis queue message_data = self.redis_client.brpop(self.message_queue, timeout=1) if message_data: _, message_json = message_data message_obj = json.loads(message_json) # Process the message await self.process_message( message=message_obj.get("content", ""), sender=message_obj.get("sender", "unknown") ) # Small delay to prevent excessive CPU usage await asyncio.sleep(0.1) except KeyboardInterrupt: self.logger.info(f"🛑 {self.agent_name} shutting down") break except Exception as e: self.logger.error(f"❌ Error in message listener: {e}") await asyncio.sleep(1) def send_message_to_agent(self, target_agent: str, message: str): """ Sendet Nachricht an einen anderen SAAP Agent """ message_data = { "content": message, "sender": self.agent_name, "timestamp": time.time() } self.redis_client.lpush(f"saap_agent_{target_agent}", json.dumps(message_data)) self.logger.info(f"📤 Message sent to {target_agent}") async def health_check(self) -> Dict[str, Any]: """ Überprüft Gesundheitsstatus des colossus Servers """ try: test_prompt = "Hello, this is a connection test." result = await self.send_request_to_colossus(test_prompt, max_tokens=50) health_status = { "agent_name": self.agent_name, "colossus_status": "healthy" if result["success"] else "unhealthy", "response_time": result.get("response_time", 0), "error": result.get("error") if not result["success"] else None, "timestamp": time.time() } return health_status except Exception as e: return { "agent_name": self.agent_name, "colossus_status": "error", "error": str(e), "timestamp": time.time() } # Performance Benchmarking Funktionen class ColossusBenchmark: def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://ai.adrian-schupp.de" async def run_performance_benchmark(self, test_prompts: List[str]) -> Dict[str, Any]: """ Führt Performance-Benchmark mit verschiedenen Test-Prompts durch """ results = [] # Temporärer Agent für Benchmark benchmark_agent = ColossusSAAPAgent("benchmark_agent", "Benchmark", self.api_key) for i, prompt in enumerate(test_prompts): print(f"🧪 Running test {i+1}/{len(test_prompts)}: {prompt[:50]}...") result = await benchmark_agent.send_request_to_colossus(prompt) results.append({ "test_id": i + 1, "prompt": prompt, "success": result["success"], "response_time": result.get("response_time", 0), "token_count": result.get("token_count", 0), "error": result.get("error") if not result["success"] else None }) # Pause zwischen Tests um Server nicht zu überlasten await asyncio.sleep(1) # Statistiken berechnen successful_tests = [r for r in results if r["success"]] if successful_tests: avg_response_time = sum(r["response_time"] for r in successful_tests) / len(successful_tests) avg_token_count = sum(r["token_count"] for r in successful_tests) / len(successful_tests) benchmark_summary = { "total_tests": len(test_prompts), "successful_tests": len(successful_tests), "success_rate": len(successful_tests) / len(test_prompts) * 100, "average_response_time": round(avg_response_time, 2), "average_token_count": round(avg_token_count, 0), "performance_target_met": avg_response_time < 2.0, # < 2s Ziel "results": results } else: benchmark_summary = { "total_tests": len(test_prompts), "successful_tests": 0, "success_rate": 0, "error": "No successful tests completed", "results": results } return benchmark_summary # Utility Functions für SAAP Integration def create_saap_colossus_agents(api_key: str) -> List[ColossusSAAPAgent]: """ Erstellt die 3 Basis-Agenten für SAAP Multi-Agent System wie im Plan """ agents = [ ColossusSAAPAgent("agent_coordinator", "Coordinator", api_key), ColossusSAAPAgent("agent_developer", "Developer", api_key), ColossusSAAPAgent("agent_analyst", "Analyst", api_key) ] return agents if __name__ == "__main__": # Demo und Testing import asyncio # Load API key from environment variable API_KEY = os.getenv("COLOSSUS_API_KEY") if not API_KEY: print("❌ Error: COLOSSUS_API_KEY not set in environment variables") print("Please set it in backend/.env file:") print("COLOSSUS_API_KEY=sk-your-actual-key-here") exit(1) async def demo_colossus_integration(): print("🚀 SAAP colossus Server Integration Demo") # Create test agent agent = ColossusSAAPAgent("demo_coordinator", "Coordinator", API_KEY) # Health check print("\n📊 Health Check...") health = await agent.health_check() print(f"Status: {health}") # Test message processing if health.get("colossus_status") == "healthy": print("\n💬 Processing test message...") result = await agent.process_message( "Erkläre mir die Vorteile einer Multi-Agent-Architektur für SAAP.", "test_user" ) print(f"Response: {result.get('response', 'No response')}") # Performance benchmark print("\n🧪 Running mini performance benchmark...") benchmark = ColossusBenchmark(API_KEY) test_prompts = [ "Was ist SAAP?", "Erkläre Multi-Agent Systeme in 3 Sätzen.", "Vorteile von On-Premise vs Cloud AI?" ] benchmark_results = await benchmark.run_performance_benchmark(test_prompts) print(f"Benchmark Results: {benchmark_results}") # Run demo asyncio.run(demo_colossus_integration())