Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| SAAP Colossus Server Integration Agent | |
| Integration von colossus Server (https://ai.adrian-schupp.de) in SAAP Multi-Agent System | |
| Author: Hanan Wandji Danga & Jane Alesi | |
| """ | |
| import os | |
| from dotenv import load_dotenv | |
| import requests | |
| import json | |
| import time | |
| import asyncio | |
| import redis | |
| from typing import Dict, List, Optional, Any | |
| import logging | |
| # Load environment variables | |
| load_dotenv() | |
| class ColossusSAAPAgent: | |
| def __init__(self, agent_name: str, role: str, api_key: str, base_url: str = "https://ai.adrian-schupp.de"): | |
| """ | |
| Initialisierung des Colossus SAAP Agents | |
| Args: | |
| agent_name: Name des Agents (z.B. "agent_coordinator") | |
| role: Rolle des Agents (z.B. "Coordinator", "Developer", "Analyst") | |
| api_key: API Key für colossus Server | |
| base_url: Base URL des colossus Servers | |
| """ | |
| self.agent_name = agent_name | |
| self.role = role | |
| self.api_key = api_key | |
| self.base_url = base_url | |
| self.model_name = "mistral-small3.2:24b-instruct-2506" | |
| # Redis Configuration für SAAP Message Queue | |
| self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) | |
| self.message_queue = f"saap_agent_{agent_name}" | |
| # Agent Context für rollenspezifische Antworten | |
| self.context = self._initialize_context() | |
| # Logging Setup | |
| logging.basicConfig(level=logging.INFO) | |
| self.logger = logging.getLogger(f"ColossusSAAP.{agent_name}") | |
| self.logger.info(f"🚀 {agent_name} ({role}) initiated with colossus Server") | |
| def _initialize_context(self) -> str: | |
| """Initialisiert rollenspezifischen Kontext für den Agent""" | |
| contexts = { | |
| "Coordinator": """Du bist der SAAP Agent Coordinator. Deine Aufgabe ist es: | |
| - Multi-Agent Workflows zu koordinieren | |
| - Aufgaben an spezialisierte Agenten zu delegieren | |
| - Systemüberblick zu behalten und Entscheidungen zu treffen | |
| - Effizienz und Performance des gesamten SAAP-Systems zu optimieren""", | |
| "Developer": """Du bist der SAAP Developer Agent. Deine Aufgabe ist es: | |
| - Code zu schreiben und zu überprüfen | |
| - Technische Implementierungen zu planen | |
| - Architektur-Entscheidungen zu treffen | |
| - Code-Qualität und Best Practices sicherzustellen""", | |
| "Analyst": """Du bist der SAAP Analyst Agent. Deine Aufgabe ist es: | |
| - Daten und Anforderungen zu analysieren | |
| - Use Cases zu definieren und dokumentieren | |
| - System-Performance zu bewerten | |
| - Optimierungspotenziale zu identifizieren""", | |
| "default": f"""Du bist ein SAAP Agent mit der Rolle '{self.role}'. | |
| Beantworte Anfragen professionell und hilfsbereit basierend auf deiner Spezialisierung.""" | |
| } | |
| return contexts.get(self.role, contexts["default"]) | |
| async def send_request_to_colossus(self, prompt: str, max_tokens: int = 500) -> Dict[str, Any]: | |
| """ | |
| Sendet Request an colossus Server und misst Performance | |
| Args: | |
| prompt: Der Eingabe-Prompt | |
| max_tokens: Maximale Token-Anzahl für Response | |
| Returns: | |
| Dict mit Response, Performance-Metriken und Metadaten | |
| """ | |
| start_time = time.time() | |
| # Request Headers | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # Request Body (OpenAI-kompatible API vermutlich) | |
| payload = { | |
| "model": self.model_name, | |
| "messages": [ | |
| {"role": "system", "content": self.context}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "max_tokens": max_tokens, | |
| "temperature": 0.7 | |
| } | |
| try: | |
| # API Call an colossus Server | |
| response = requests.post( | |
| f"{self.base_url}/v1/chat/completions", # Standard OpenAI API Format | |
| headers=headers, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| response_time = time.time() - start_time | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Performance Metrics berechnen | |
| response_text = data['choices'][0]['message']['content'] | |
| token_count = data.get('usage', {}).get('total_tokens', 0) | |
| result = { | |
| "success": True, | |
| "response": response_text, | |
| "response_time": round(response_time, 2), | |
| "token_count": token_count, | |
| "model": self.model_name, | |
| "agent_role": self.role, | |
| "timestamp": time.time() | |
| } | |
| self.logger.info(f"✅ colossus Response: {response_time:.2f}s, {token_count} tokens") | |
| return result | |
| else: | |
| error_result = { | |
| "success": False, | |
| "error": f"HTTP {response.status_code}: {response.text}", | |
| "response_time": round(response_time, 2), | |
| "timestamp": time.time() | |
| } | |
| self.logger.error(f"❌ colossus Error: {response.status_code}") | |
| return error_result | |
| except requests.exceptions.RequestException as e: | |
| error_result = { | |
| "success": False, | |
| "error": f"Request failed: {str(e)}", | |
| "response_time": round(time.time() - start_time, 2), | |
| "timestamp": time.time() | |
| } | |
| self.logger.error(f"❌ colossus Connection Error: {e}") | |
| return error_result | |
| async def process_message(self, message: str, sender: str = "system") -> Dict[str, Any]: | |
| """ | |
| Verarbeitet eingehende Nachricht und generiert intelligente Antwort | |
| """ | |
| self.logger.info(f"🔄 {self.agent_name} processing message from {sender}") | |
| # Erweiterte Prompt-Konstruktion mit Sender-Kontext | |
| enhanced_prompt = f"""[Nachricht von {sender}] | |
| {message} | |
| Bitte antworte als {self.role} Agent im SAAP System. Sei präzise und hilfreich.""" | |
| # colossus Server Request | |
| result = await self.send_request_to_colossus(enhanced_prompt) | |
| if result["success"]: | |
| # Message in Redis Queue für andere Agenten | |
| response_data = { | |
| "agent_name": self.agent_name, | |
| "agent_role": self.role, | |
| "original_message": message, | |
| "response": result["response"], | |
| "sender": sender, | |
| "performance": { | |
| "response_time": result["response_time"], | |
| "token_count": result["token_count"] | |
| }, | |
| "timestamp": result["timestamp"], | |
| "server": "colossus" | |
| } | |
| # Publish to Redis for other agents and dashboard | |
| self.redis_client.lpush(f"saap_responses", json.dumps(response_data)) | |
| self.redis_client.publish(f"saap_agent_updates", json.dumps(response_data)) | |
| self.logger.info(f"✅ Response generated and published to Redis") | |
| return result | |
| else: | |
| self.logger.error(f"❌ Failed to process message: {result.get('error')}") | |
| return result | |
| async def listen_for_messages(self): | |
| """ | |
| Lauscht auf eingehende Nachrichten in der Redis Queue | |
| """ | |
| self.logger.info(f"👂 {self.agent_name} listening for messages on {self.message_queue}") | |
| while True: | |
| try: | |
| # Pop message from Redis queue | |
| message_data = self.redis_client.brpop(self.message_queue, timeout=1) | |
| if message_data: | |
| _, message_json = message_data | |
| message_obj = json.loads(message_json) | |
| # Process the message | |
| await self.process_message( | |
| message=message_obj.get("content", ""), | |
| sender=message_obj.get("sender", "unknown") | |
| ) | |
| # Small delay to prevent excessive CPU usage | |
| await asyncio.sleep(0.1) | |
| except KeyboardInterrupt: | |
| self.logger.info(f"🛑 {self.agent_name} shutting down") | |
| break | |
| except Exception as e: | |
| self.logger.error(f"❌ Error in message listener: {e}") | |
| await asyncio.sleep(1) | |
| def send_message_to_agent(self, target_agent: str, message: str): | |
| """ | |
| Sendet Nachricht an einen anderen SAAP Agent | |
| """ | |
| message_data = { | |
| "content": message, | |
| "sender": self.agent_name, | |
| "timestamp": time.time() | |
| } | |
| self.redis_client.lpush(f"saap_agent_{target_agent}", json.dumps(message_data)) | |
| self.logger.info(f"📤 Message sent to {target_agent}") | |
| async def health_check(self) -> Dict[str, Any]: | |
| """ | |
| Überprüft Gesundheitsstatus des colossus Servers | |
| """ | |
| try: | |
| test_prompt = "Hello, this is a connection test." | |
| result = await self.send_request_to_colossus(test_prompt, max_tokens=50) | |
| health_status = { | |
| "agent_name": self.agent_name, | |
| "colossus_status": "healthy" if result["success"] else "unhealthy", | |
| "response_time": result.get("response_time", 0), | |
| "error": result.get("error") if not result["success"] else None, | |
| "timestamp": time.time() | |
| } | |
| return health_status | |
| except Exception as e: | |
| return { | |
| "agent_name": self.agent_name, | |
| "colossus_status": "error", | |
| "error": str(e), | |
| "timestamp": time.time() | |
| } | |
| # Performance Benchmarking Funktionen | |
| class ColossusBenchmark: | |
| def __init__(self, api_key: str): | |
| self.api_key = api_key | |
| self.base_url = "https://ai.adrian-schupp.de" | |
| async def run_performance_benchmark(self, test_prompts: List[str]) -> Dict[str, Any]: | |
| """ | |
| Führt Performance-Benchmark mit verschiedenen Test-Prompts durch | |
| """ | |
| results = [] | |
| # Temporärer Agent für Benchmark | |
| benchmark_agent = ColossusSAAPAgent("benchmark_agent", "Benchmark", self.api_key) | |
| for i, prompt in enumerate(test_prompts): | |
| print(f"🧪 Running test {i+1}/{len(test_prompts)}: {prompt[:50]}...") | |
| result = await benchmark_agent.send_request_to_colossus(prompt) | |
| results.append({ | |
| "test_id": i + 1, | |
| "prompt": prompt, | |
| "success": result["success"], | |
| "response_time": result.get("response_time", 0), | |
| "token_count": result.get("token_count", 0), | |
| "error": result.get("error") if not result["success"] else None | |
| }) | |
| # Pause zwischen Tests um Server nicht zu überlasten | |
| await asyncio.sleep(1) | |
| # Statistiken berechnen | |
| successful_tests = [r for r in results if r["success"]] | |
| if successful_tests: | |
| avg_response_time = sum(r["response_time"] for r in successful_tests) / len(successful_tests) | |
| avg_token_count = sum(r["token_count"] for r in successful_tests) / len(successful_tests) | |
| benchmark_summary = { | |
| "total_tests": len(test_prompts), | |
| "successful_tests": len(successful_tests), | |
| "success_rate": len(successful_tests) / len(test_prompts) * 100, | |
| "average_response_time": round(avg_response_time, 2), | |
| "average_token_count": round(avg_token_count, 0), | |
| "performance_target_met": avg_response_time < 2.0, # < 2s Ziel | |
| "results": results | |
| } | |
| else: | |
| benchmark_summary = { | |
| "total_tests": len(test_prompts), | |
| "successful_tests": 0, | |
| "success_rate": 0, | |
| "error": "No successful tests completed", | |
| "results": results | |
| } | |
| return benchmark_summary | |
| # Utility Functions für SAAP Integration | |
| def create_saap_colossus_agents(api_key: str) -> List[ColossusSAAPAgent]: | |
| """ | |
| Erstellt die 3 Basis-Agenten für SAAP Multi-Agent System wie im Plan | |
| """ | |
| agents = [ | |
| ColossusSAAPAgent("agent_coordinator", "Coordinator", api_key), | |
| ColossusSAAPAgent("agent_developer", "Developer", api_key), | |
| ColossusSAAPAgent("agent_analyst", "Analyst", api_key) | |
| ] | |
| return agents | |
| if __name__ == "__main__": | |
| # Demo und Testing | |
| import asyncio | |
| # Load API key from environment variable | |
| API_KEY = os.getenv("COLOSSUS_API_KEY") | |
| if not API_KEY: | |
| print("❌ Error: COLOSSUS_API_KEY not set in environment variables") | |
| print("Please set it in backend/.env file:") | |
| print("COLOSSUS_API_KEY=sk-your-actual-key-here") | |
| exit(1) | |
| async def demo_colossus_integration(): | |
| print("🚀 SAAP colossus Server Integration Demo") | |
| # Create test agent | |
| agent = ColossusSAAPAgent("demo_coordinator", "Coordinator", API_KEY) | |
| # Health check | |
| print("\n📊 Health Check...") | |
| health = await agent.health_check() | |
| print(f"Status: {health}") | |
| # Test message processing | |
| if health.get("colossus_status") == "healthy": | |
| print("\n💬 Processing test message...") | |
| result = await agent.process_message( | |
| "Erkläre mir die Vorteile einer Multi-Agent-Architektur für SAAP.", | |
| "test_user" | |
| ) | |
| print(f"Response: {result.get('response', 'No response')}") | |
| # Performance benchmark | |
| print("\n🧪 Running mini performance benchmark...") | |
| benchmark = ColossusBenchmark(API_KEY) | |
| test_prompts = [ | |
| "Was ist SAAP?", | |
| "Erkläre Multi-Agent Systeme in 3 Sätzen.", | |
| "Vorteile von On-Premise vs Cloud AI?" | |
| ] | |
| benchmark_results = await benchmark.run_performance_benchmark(test_prompts) | |
| print(f"Benchmark Results: {benchmark_results}") | |
| # Run demo | |
| asyncio.run(demo_colossus_integration()) | |