Spaces:
Sleeping
Sleeping
| """ | |
| Privacy Detector for SAAP Multi-Agent System | |
| Detects sensitive data to route to local colossus instead of external OpenRouter | |
| """ | |
| import re | |
| import logging | |
| from typing import Dict, List, Tuple | |
| from enum import Enum | |
| logger = logging.getLogger(__name__) | |
| class PrivacyLevel(Enum): | |
| """Privacy classification levels""" | |
| PUBLIC = "public" # Safe for external processing | |
| INTERNAL = "internal" # Prefer local but not critical | |
| CONFIDENTIAL = "confidential" # Should be local | |
| PRIVATE = "private" # MUST be local (medical, financial, personal) | |
| class PrivacyDetector: | |
| """ | |
| Detects sensitive data in user messages to ensure privacy-compliant routing | |
| Detection Methods: | |
| 1. Keyword-based (medical, financial, personal data keywords) | |
| 2. Pattern-based (credit cards, SSN, IBAN, etc.) | |
| 3. Agent-based rules (medical agent = always private) | |
| """ | |
| def __init__(self): | |
| # Sensitive keyword categories | |
| self.sensitive_keywords = { | |
| "medical": [ | |
| "patient", "patienten", "diagnosis", "diagnose", "treatment", "behandlung", | |
| "medication", "medikament", "symptom", "krankheit", "disease", "arzt", | |
| "doctor", "hospital", "krankenhaus", "gesundheit", "health", "medizin", | |
| "medicine", "therapie", "therapy", "blut", "blood", "operation" | |
| ], | |
| "financial": [ | |
| # Only TRULY sensitive financial data (account numbers, cards, passwords) | |
| # General financial advice keywords removed (investment, portfolio, sparen, etc.) | |
| "account number", "kontonummer", "password", "passwort", | |
| "credit card", "kreditkarte", "iban", "bic", | |
| "pin", "cvv", "security code", "sicherheitscode", | |
| "salary", "gehalt", # Personal income data | |
| "tax id", "steuernummer" # Personal tax data | |
| ], | |
| "personal": [ | |
| "social security", "sozialversicherung", "passport", "reisepass", | |
| "driver license", "führerschein", "birthday", "geburtstag", "geburtsdatum", | |
| "address", "adresse", "phone", "telefon", "email", "personalausweis", | |
| "id card", "tax", "steuer", "insurance", "versicherung" | |
| ], | |
| "legal": [ | |
| "contract", "vertrag", "confidential", "vertraulich", "proprietary", | |
| "nda", "geheimhaltung", "lawsuit", "klage", "court", "gericht", | |
| "lawyer", "anwalt", "legal", "rechtlich" | |
| ], | |
| "security": [ | |
| "secret", "geheim", "private key", "token", "api key", "credentials", | |
| "zugangsdaten", "authentication", "authentifizierung" | |
| ] | |
| } | |
| # Sensitive data patterns (regex) | |
| self.sensitive_patterns = [ | |
| # Credit card numbers | |
| (r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "credit_card"), | |
| # SSN (US) | |
| (r"\b\d{3}-\d{2}-\d{4}\b", "ssn"), | |
| # IBAN | |
| (r"\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b", "iban"), | |
| # Dates (potential birthdays) | |
| (r"\b\d{2}[./]\d{2}[./]\d{4}\b", "date"), | |
| # Email addresses | |
| (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email"), | |
| # Phone numbers (German format) | |
| (r"\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b", "phone"), | |
| # API keys/tokens (long alphanumeric strings) | |
| (r"\b[A-Za-z0-9]{32,}\b", "api_key") | |
| ] | |
| # Agent-specific privacy rules | |
| # Only MANDATORY privacy rules here (content-based detection is preferred) | |
| self.agent_privacy_rules = { | |
| "lara_alesi": PrivacyLevel.PRIVATE, # Medical - ALWAYS private (health data) | |
| "theo_alesi": PrivacyLevel.INTERNAL, # Financial - depends on content | |
| "justus_alesi": PrivacyLevel.INTERNAL, # Legal - depends on content | |
| "jane_alesi": PrivacyLevel.INTERNAL, # Coordinator - depends on content | |
| "john_alesi": PrivacyLevel.INTERNAL, # Development - usually safe | |
| "leon_alesi": PrivacyLevel.INTERNAL, # System - usually safe | |
| "luna_alesi": PrivacyLevel.INTERNAL # Coaching - usually safe | |
| } | |
| def detect_privacy_level( | |
| self, | |
| message: str, | |
| agent_id: str = None, | |
| user_privacy_flag: str = None | |
| ) -> Tuple[PrivacyLevel, Dict]: | |
| """ | |
| Detect privacy level of message | |
| Args: | |
| message: User message to analyze | |
| agent_id: Agent that will process this (optional) | |
| user_privacy_flag: User-specified privacy level (optional) | |
| Returns: | |
| Tuple of (PrivacyLevel, detection_details) | |
| """ | |
| # User override takes precedence | |
| if user_privacy_flag: | |
| level = self._parse_user_privacy_flag(user_privacy_flag) | |
| return level, {"reason": "user_override", "user_flag": user_privacy_flag} | |
| # Check agent-specific rules first | |
| if agent_id and agent_id in self.agent_privacy_rules: | |
| agent_level = self.agent_privacy_rules[agent_id] | |
| if agent_level == PrivacyLevel.PRIVATE: | |
| return agent_level, { | |
| "reason": "agent_rule", | |
| "agent": agent_id, | |
| "rule": "always_private" | |
| } | |
| # Keyword detection | |
| keyword_matches = self._detect_keywords(message) | |
| # Pattern detection | |
| pattern_matches = self._detect_patterns(message) | |
| # Combine detections | |
| total_detections = len(keyword_matches) + len(pattern_matches) | |
| # Determine privacy level based on detections | |
| details = { | |
| "keyword_matches": keyword_matches, | |
| "pattern_matches": pattern_matches, | |
| "total_detections": total_detections | |
| } | |
| # Classification logic | |
| if pattern_matches: # Any pattern match = high sensitivity | |
| if any(cat in ["credit_card", "ssn", "iban"] for pat, cat in pattern_matches): | |
| return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_pattern"} | |
| if keyword_matches: | |
| categories = set(cat for cat, _ in keyword_matches) | |
| # Medical or financial keywords = PRIVATE | |
| if "medical" in categories or "financial" in categories: | |
| return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_keywords"} | |
| # Personal or legal = CONFIDENTIAL | |
| if "personal" in categories or "legal" in categories: | |
| return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "confidential_keywords"} | |
| # Security keywords = CONFIDENTIAL | |
| if "security" in categories: | |
| return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "security_keywords"} | |
| # Apply agent rule if no strong detection | |
| if agent_id and agent_id in self.agent_privacy_rules: | |
| return self.agent_privacy_rules[agent_id], { | |
| **details, | |
| "reason": "agent_default", | |
| "agent": agent_id | |
| } | |
| # Default: PUBLIC (safe for external processing) | |
| return PrivacyLevel.PUBLIC, {**details, "reason": "no_sensitive_data"} | |
| def _detect_keywords(self, message: str) -> List[Tuple[str, str]]: | |
| """Detect sensitive keywords in message using word boundaries""" | |
| message_lower = message.lower() | |
| matches = [] | |
| for category, keywords in self.sensitive_keywords.items(): | |
| for keyword in keywords: | |
| # Use word boundaries to avoid false positives | |
| # "health" in "wealth" won't match | |
| pattern = r'\b' + re.escape(keyword) + r'\b' | |
| if re.search(pattern, message_lower): | |
| matches.append((category, keyword)) | |
| return matches | |
| def _detect_patterns(self, message: str) -> List[Tuple[str, str]]: | |
| """Detect sensitive patterns in message""" | |
| matches = [] | |
| for pattern, category in self.sensitive_patterns: | |
| if re.search(pattern, message): | |
| matches.append((pattern, category)) | |
| return matches | |
| def _parse_user_privacy_flag(self, flag: str) -> PrivacyLevel: | |
| """Parse user-specified privacy flag""" | |
| flag_lower = flag.lower() | |
| if flag_lower in ["private", "high", "strict"]: | |
| return PrivacyLevel.PRIVATE | |
| elif flag_lower in ["confidential", "medium"]: | |
| return PrivacyLevel.CONFIDENTIAL | |
| elif flag_lower in ["internal", "low"]: | |
| return PrivacyLevel.INTERNAL | |
| else: | |
| return PrivacyLevel.PUBLIC | |
| def should_use_local_provider(self, privacy_level: PrivacyLevel, mode: str = "balanced") -> bool: | |
| """ | |
| Determine if local provider should be used based on privacy level and mode | |
| Args: | |
| privacy_level: Detected privacy level | |
| mode: Privacy mode (strict, balanced, performance) | |
| Returns: | |
| True if should use local provider (colossus), False if external OK (OpenRouter) | |
| """ | |
| if mode == "strict": | |
| # Strict: Everything goes local | |
| return True | |
| elif mode == "balanced": | |
| # Balanced: Private and Confidential go local | |
| return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL] | |
| elif mode == "performance": | |
| # Performance: Only explicitly PRIVATE goes local | |
| return privacy_level == PrivacyLevel.PRIVATE | |
| else: | |
| # Default to balanced | |
| return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL] | |
| # Singleton instance | |
| privacy_detector = PrivacyDetector() | |
| # Convenience functions | |
| def detect_privacy_level(message: str, agent_id: str = None, user_flag: str = None) -> Tuple[PrivacyLevel, Dict]: | |
| """Convenience function to detect privacy level""" | |
| return privacy_detector.detect_privacy_level(message, agent_id, user_flag) | |
| def should_use_local(message: str, agent_id: str = None, mode: str = "balanced") -> bool: | |
| """Convenience function to check if local provider should be used""" | |
| level, details = privacy_detector.detect_privacy_level(message, agent_id) | |
| use_local = privacy_detector.should_use_local_provider(level, mode) | |
| logger.info(f"Privacy check: {level.value} → {'LOCAL' if use_local else 'EXTERNAL'} ({details.get('reason', 'unknown')})") | |
| return use_local | |
| if __name__ == "__main__": | |
| # Demo privacy detection | |
| test_cases = [ | |
| ("What is Python?", "john_alesi"), | |
| ("My patient has symptoms of diabetes", "lara_alesi"), | |
| ("Analyze my bank account: DE89370400440532013000", "theo_alesi"), | |
| ("Review this confidential contract", "justus_alesi"), | |
| ("How to optimize this code?", "john_alesi"), | |
| ("My credit card number is 4532-1234-5678-9010", None) | |
| ] | |
| print("🔒 Privacy Detection Demo\n") | |
| for message, agent in test_cases: | |
| level, details = detect_privacy_level(message, agent) | |
| use_local = privacy_detector.should_use_local_provider(level, mode="balanced") | |
| print(f"Message: '{message[:50]}...'") | |
| print(f"Agent: {agent or 'None'}") | |
| print(f"Privacy Level: {level.value}") | |
| print(f"Routing: {'LOCAL (colossus)' if use_local else 'EXTERNAL (OpenRouter)'}") | |
| print(f"Reason: {details.get('reason')}") | |
| if details.get('keyword_matches'): | |
| print(f"Keywords: {details['keyword_matches']}") | |
| if details.get('pattern_matches'): | |
| print(f"Patterns: {details['pattern_matches']}") | |
| print() | |