""" Privacy Detector for SAAP Multi-Agent System Detects sensitive data to route to local colossus instead of external OpenRouter """ import re import logging from typing import Dict, List, Tuple from enum import Enum logger = logging.getLogger(__name__) class PrivacyLevel(Enum): """Privacy classification levels""" PUBLIC = "public" # Safe for external processing INTERNAL = "internal" # Prefer local but not critical CONFIDENTIAL = "confidential" # Should be local PRIVATE = "private" # MUST be local (medical, financial, personal) class PrivacyDetector: """ Detects sensitive data in user messages to ensure privacy-compliant routing Detection Methods: 1. Keyword-based (medical, financial, personal data keywords) 2. Pattern-based (credit cards, SSN, IBAN, etc.) 3. Agent-based rules (medical agent = always private) """ def __init__(self): # Sensitive keyword categories self.sensitive_keywords = { "medical": [ "patient", "patienten", "diagnosis", "diagnose", "treatment", "behandlung", "medication", "medikament", "symptom", "krankheit", "disease", "arzt", "doctor", "hospital", "krankenhaus", "gesundheit", "health", "medizin", "medicine", "therapie", "therapy", "blut", "blood", "operation" ], "financial": [ # Only TRULY sensitive financial data (account numbers, cards, passwords) # General financial advice keywords removed (investment, portfolio, sparen, etc.) "account number", "kontonummer", "password", "passwort", "credit card", "kreditkarte", "iban", "bic", "pin", "cvv", "security code", "sicherheitscode", "salary", "gehalt", # Personal income data "tax id", "steuernummer" # Personal tax data ], "personal": [ "social security", "sozialversicherung", "passport", "reisepass", "driver license", "führerschein", "birthday", "geburtstag", "geburtsdatum", "address", "adresse", "phone", "telefon", "email", "personalausweis", "id card", "tax", "steuer", "insurance", "versicherung" ], "legal": [ "contract", "vertrag", "confidential", "vertraulich", "proprietary", "nda", "geheimhaltung", "lawsuit", "klage", "court", "gericht", "lawyer", "anwalt", "legal", "rechtlich" ], "security": [ "secret", "geheim", "private key", "token", "api key", "credentials", "zugangsdaten", "authentication", "authentifizierung" ] } # Sensitive data patterns (regex) self.sensitive_patterns = [ # Credit card numbers (r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "credit_card"), # SSN (US) (r"\b\d{3}-\d{2}-\d{4}\b", "ssn"), # IBAN (r"\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b", "iban"), # Dates (potential birthdays) (r"\b\d{2}[./]\d{2}[./]\d{4}\b", "date"), # Email addresses (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email"), # Phone numbers (German format) (r"\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b", "phone"), # API keys/tokens (long alphanumeric strings) (r"\b[A-Za-z0-9]{32,}\b", "api_key") ] # Agent-specific privacy rules # Only MANDATORY privacy rules here (content-based detection is preferred) self.agent_privacy_rules = { "lara_alesi": PrivacyLevel.PRIVATE, # Medical - ALWAYS private (health data) "theo_alesi": PrivacyLevel.INTERNAL, # Financial - depends on content "justus_alesi": PrivacyLevel.INTERNAL, # Legal - depends on content "jane_alesi": PrivacyLevel.INTERNAL, # Coordinator - depends on content "john_alesi": PrivacyLevel.INTERNAL, # Development - usually safe "leon_alesi": PrivacyLevel.INTERNAL, # System - usually safe "luna_alesi": PrivacyLevel.INTERNAL # Coaching - usually safe } def detect_privacy_level( self, message: str, agent_id: str = None, user_privacy_flag: str = None ) -> Tuple[PrivacyLevel, Dict]: """ Detect privacy level of message Args: message: User message to analyze agent_id: Agent that will process this (optional) user_privacy_flag: User-specified privacy level (optional) Returns: Tuple of (PrivacyLevel, detection_details) """ # User override takes precedence if user_privacy_flag: level = self._parse_user_privacy_flag(user_privacy_flag) return level, {"reason": "user_override", "user_flag": user_privacy_flag} # Check agent-specific rules first if agent_id and agent_id in self.agent_privacy_rules: agent_level = self.agent_privacy_rules[agent_id] if agent_level == PrivacyLevel.PRIVATE: return agent_level, { "reason": "agent_rule", "agent": agent_id, "rule": "always_private" } # Keyword detection keyword_matches = self._detect_keywords(message) # Pattern detection pattern_matches = self._detect_patterns(message) # Combine detections total_detections = len(keyword_matches) + len(pattern_matches) # Determine privacy level based on detections details = { "keyword_matches": keyword_matches, "pattern_matches": pattern_matches, "total_detections": total_detections } # Classification logic if pattern_matches: # Any pattern match = high sensitivity if any(cat in ["credit_card", "ssn", "iban"] for pat, cat in pattern_matches): return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_pattern"} if keyword_matches: categories = set(cat for cat, _ in keyword_matches) # Medical or financial keywords = PRIVATE if "medical" in categories or "financial" in categories: return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_keywords"} # Personal or legal = CONFIDENTIAL if "personal" in categories or "legal" in categories: return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "confidential_keywords"} # Security keywords = CONFIDENTIAL if "security" in categories: return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "security_keywords"} # Apply agent rule if no strong detection if agent_id and agent_id in self.agent_privacy_rules: return self.agent_privacy_rules[agent_id], { **details, "reason": "agent_default", "agent": agent_id } # Default: PUBLIC (safe for external processing) return PrivacyLevel.PUBLIC, {**details, "reason": "no_sensitive_data"} def _detect_keywords(self, message: str) -> List[Tuple[str, str]]: """Detect sensitive keywords in message using word boundaries""" message_lower = message.lower() matches = [] for category, keywords in self.sensitive_keywords.items(): for keyword in keywords: # Use word boundaries to avoid false positives # "health" in "wealth" won't match pattern = r'\b' + re.escape(keyword) + r'\b' if re.search(pattern, message_lower): matches.append((category, keyword)) return matches def _detect_patterns(self, message: str) -> List[Tuple[str, str]]: """Detect sensitive patterns in message""" matches = [] for pattern, category in self.sensitive_patterns: if re.search(pattern, message): matches.append((pattern, category)) return matches def _parse_user_privacy_flag(self, flag: str) -> PrivacyLevel: """Parse user-specified privacy flag""" flag_lower = flag.lower() if flag_lower in ["private", "high", "strict"]: return PrivacyLevel.PRIVATE elif flag_lower in ["confidential", "medium"]: return PrivacyLevel.CONFIDENTIAL elif flag_lower in ["internal", "low"]: return PrivacyLevel.INTERNAL else: return PrivacyLevel.PUBLIC def should_use_local_provider(self, privacy_level: PrivacyLevel, mode: str = "balanced") -> bool: """ Determine if local provider should be used based on privacy level and mode Args: privacy_level: Detected privacy level mode: Privacy mode (strict, balanced, performance) Returns: True if should use local provider (colossus), False if external OK (OpenRouter) """ if mode == "strict": # Strict: Everything goes local return True elif mode == "balanced": # Balanced: Private and Confidential go local return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL] elif mode == "performance": # Performance: Only explicitly PRIVATE goes local return privacy_level == PrivacyLevel.PRIVATE else: # Default to balanced return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL] # Singleton instance privacy_detector = PrivacyDetector() # Convenience functions def detect_privacy_level(message: str, agent_id: str = None, user_flag: str = None) -> Tuple[PrivacyLevel, Dict]: """Convenience function to detect privacy level""" return privacy_detector.detect_privacy_level(message, agent_id, user_flag) def should_use_local(message: str, agent_id: str = None, mode: str = "balanced") -> bool: """Convenience function to check if local provider should be used""" level, details = privacy_detector.detect_privacy_level(message, agent_id) use_local = privacy_detector.should_use_local_provider(level, mode) logger.info(f"Privacy check: {level.value} → {'LOCAL' if use_local else 'EXTERNAL'} ({details.get('reason', 'unknown')})") return use_local if __name__ == "__main__": # Demo privacy detection test_cases = [ ("What is Python?", "john_alesi"), ("My patient has symptoms of diabetes", "lara_alesi"), ("Analyze my bank account: DE89370400440532013000", "theo_alesi"), ("Review this confidential contract", "justus_alesi"), ("How to optimize this code?", "john_alesi"), ("My credit card number is 4532-1234-5678-9010", None) ] print("🔒 Privacy Detection Demo\n") for message, agent in test_cases: level, details = detect_privacy_level(message, agent) use_local = privacy_detector.should_use_local_provider(level, mode="balanced") print(f"Message: '{message[:50]}...'") print(f"Agent: {agent or 'None'}") print(f"Privacy Level: {level.value}") print(f"Routing: {'LOCAL (colossus)' if use_local else 'EXTERNAL (OpenRouter)'}") print(f"Reason: {details.get('reason')}") if details.get('keyword_matches'): print(f"Keywords: {details['keyword_matches']}") if details.get('pattern_matches'): print(f"Patterns: {details['pattern_matches']}") print()