Spaces:
Sleeping
Sleeping
File size: 12,031 Bytes
4343907 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
"""
Privacy Detector for SAAP Multi-Agent System
Detects sensitive data to route to local colossus instead of external OpenRouter
"""
import re
import logging
from typing import Dict, List, Tuple
from enum import Enum
logger = logging.getLogger(__name__)
class PrivacyLevel(Enum):
"""Privacy classification levels"""
PUBLIC = "public" # Safe for external processing
INTERNAL = "internal" # Prefer local but not critical
CONFIDENTIAL = "confidential" # Should be local
PRIVATE = "private" # MUST be local (medical, financial, personal)
class PrivacyDetector:
"""
Detects sensitive data in user messages to ensure privacy-compliant routing
Detection Methods:
1. Keyword-based (medical, financial, personal data keywords)
2. Pattern-based (credit cards, SSN, IBAN, etc.)
3. Agent-based rules (medical agent = always private)
"""
def __init__(self):
# Sensitive keyword categories
self.sensitive_keywords = {
"medical": [
"patient", "patienten", "diagnosis", "diagnose", "treatment", "behandlung",
"medication", "medikament", "symptom", "krankheit", "disease", "arzt",
"doctor", "hospital", "krankenhaus", "gesundheit", "health", "medizin",
"medicine", "therapie", "therapy", "blut", "blood", "operation"
],
"financial": [
# Only TRULY sensitive financial data (account numbers, cards, passwords)
# General financial advice keywords removed (investment, portfolio, sparen, etc.)
"account number", "kontonummer", "password", "passwort",
"credit card", "kreditkarte", "iban", "bic",
"pin", "cvv", "security code", "sicherheitscode",
"salary", "gehalt", # Personal income data
"tax id", "steuernummer" # Personal tax data
],
"personal": [
"social security", "sozialversicherung", "passport", "reisepass",
"driver license", "führerschein", "birthday", "geburtstag", "geburtsdatum",
"address", "adresse", "phone", "telefon", "email", "personalausweis",
"id card", "tax", "steuer", "insurance", "versicherung"
],
"legal": [
"contract", "vertrag", "confidential", "vertraulich", "proprietary",
"nda", "geheimhaltung", "lawsuit", "klage", "court", "gericht",
"lawyer", "anwalt", "legal", "rechtlich"
],
"security": [
"secret", "geheim", "private key", "token", "api key", "credentials",
"zugangsdaten", "authentication", "authentifizierung"
]
}
# Sensitive data patterns (regex)
self.sensitive_patterns = [
# Credit card numbers
(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "credit_card"),
# SSN (US)
(r"\b\d{3}-\d{2}-\d{4}\b", "ssn"),
# IBAN
(r"\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b", "iban"),
# Dates (potential birthdays)
(r"\b\d{2}[./]\d{2}[./]\d{4}\b", "date"),
# Email addresses
(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email"),
# Phone numbers (German format)
(r"\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b", "phone"),
# API keys/tokens (long alphanumeric strings)
(r"\b[A-Za-z0-9]{32,}\b", "api_key")
]
# Agent-specific privacy rules
# Only MANDATORY privacy rules here (content-based detection is preferred)
self.agent_privacy_rules = {
"lara_alesi": PrivacyLevel.PRIVATE, # Medical - ALWAYS private (health data)
"theo_alesi": PrivacyLevel.INTERNAL, # Financial - depends on content
"justus_alesi": PrivacyLevel.INTERNAL, # Legal - depends on content
"jane_alesi": PrivacyLevel.INTERNAL, # Coordinator - depends on content
"john_alesi": PrivacyLevel.INTERNAL, # Development - usually safe
"leon_alesi": PrivacyLevel.INTERNAL, # System - usually safe
"luna_alesi": PrivacyLevel.INTERNAL # Coaching - usually safe
}
def detect_privacy_level(
self,
message: str,
agent_id: str = None,
user_privacy_flag: str = None
) -> Tuple[PrivacyLevel, Dict]:
"""
Detect privacy level of message
Args:
message: User message to analyze
agent_id: Agent that will process this (optional)
user_privacy_flag: User-specified privacy level (optional)
Returns:
Tuple of (PrivacyLevel, detection_details)
"""
# User override takes precedence
if user_privacy_flag:
level = self._parse_user_privacy_flag(user_privacy_flag)
return level, {"reason": "user_override", "user_flag": user_privacy_flag}
# Check agent-specific rules first
if agent_id and agent_id in self.agent_privacy_rules:
agent_level = self.agent_privacy_rules[agent_id]
if agent_level == PrivacyLevel.PRIVATE:
return agent_level, {
"reason": "agent_rule",
"agent": agent_id,
"rule": "always_private"
}
# Keyword detection
keyword_matches = self._detect_keywords(message)
# Pattern detection
pattern_matches = self._detect_patterns(message)
# Combine detections
total_detections = len(keyword_matches) + len(pattern_matches)
# Determine privacy level based on detections
details = {
"keyword_matches": keyword_matches,
"pattern_matches": pattern_matches,
"total_detections": total_detections
}
# Classification logic
if pattern_matches: # Any pattern match = high sensitivity
if any(cat in ["credit_card", "ssn", "iban"] for pat, cat in pattern_matches):
return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_pattern"}
if keyword_matches:
categories = set(cat for cat, _ in keyword_matches)
# Medical or financial keywords = PRIVATE
if "medical" in categories or "financial" in categories:
return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_keywords"}
# Personal or legal = CONFIDENTIAL
if "personal" in categories or "legal" in categories:
return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "confidential_keywords"}
# Security keywords = CONFIDENTIAL
if "security" in categories:
return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "security_keywords"}
# Apply agent rule if no strong detection
if agent_id and agent_id in self.agent_privacy_rules:
return self.agent_privacy_rules[agent_id], {
**details,
"reason": "agent_default",
"agent": agent_id
}
# Default: PUBLIC (safe for external processing)
return PrivacyLevel.PUBLIC, {**details, "reason": "no_sensitive_data"}
def _detect_keywords(self, message: str) -> List[Tuple[str, str]]:
"""Detect sensitive keywords in message using word boundaries"""
message_lower = message.lower()
matches = []
for category, keywords in self.sensitive_keywords.items():
for keyword in keywords:
# Use word boundaries to avoid false positives
# "health" in "wealth" won't match
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, message_lower):
matches.append((category, keyword))
return matches
def _detect_patterns(self, message: str) -> List[Tuple[str, str]]:
"""Detect sensitive patterns in message"""
matches = []
for pattern, category in self.sensitive_patterns:
if re.search(pattern, message):
matches.append((pattern, category))
return matches
def _parse_user_privacy_flag(self, flag: str) -> PrivacyLevel:
"""Parse user-specified privacy flag"""
flag_lower = flag.lower()
if flag_lower in ["private", "high", "strict"]:
return PrivacyLevel.PRIVATE
elif flag_lower in ["confidential", "medium"]:
return PrivacyLevel.CONFIDENTIAL
elif flag_lower in ["internal", "low"]:
return PrivacyLevel.INTERNAL
else:
return PrivacyLevel.PUBLIC
def should_use_local_provider(self, privacy_level: PrivacyLevel, mode: str = "balanced") -> bool:
"""
Determine if local provider should be used based on privacy level and mode
Args:
privacy_level: Detected privacy level
mode: Privacy mode (strict, balanced, performance)
Returns:
True if should use local provider (colossus), False if external OK (OpenRouter)
"""
if mode == "strict":
# Strict: Everything goes local
return True
elif mode == "balanced":
# Balanced: Private and Confidential go local
return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]
elif mode == "performance":
# Performance: Only explicitly PRIVATE goes local
return privacy_level == PrivacyLevel.PRIVATE
else:
# Default to balanced
return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]
# Singleton instance
privacy_detector = PrivacyDetector()
# Convenience functions
def detect_privacy_level(message: str, agent_id: str = None, user_flag: str = None) -> Tuple[PrivacyLevel, Dict]:
"""Convenience function to detect privacy level"""
return privacy_detector.detect_privacy_level(message, agent_id, user_flag)
def should_use_local(message: str, agent_id: str = None, mode: str = "balanced") -> bool:
"""Convenience function to check if local provider should be used"""
level, details = privacy_detector.detect_privacy_level(message, agent_id)
use_local = privacy_detector.should_use_local_provider(level, mode)
logger.info(f"Privacy check: {level.value} → {'LOCAL' if use_local else 'EXTERNAL'} ({details.get('reason', 'unknown')})")
return use_local
if __name__ == "__main__":
# Demo privacy detection
test_cases = [
("What is Python?", "john_alesi"),
("My patient has symptoms of diabetes", "lara_alesi"),
("Analyze my bank account: DE89370400440532013000", "theo_alesi"),
("Review this confidential contract", "justus_alesi"),
("How to optimize this code?", "john_alesi"),
("My credit card number is 4532-1234-5678-9010", None)
]
print("🔒 Privacy Detection Demo\n")
for message, agent in test_cases:
level, details = detect_privacy_level(message, agent)
use_local = privacy_detector.should_use_local_provider(level, mode="balanced")
print(f"Message: '{message[:50]}...'")
print(f"Agent: {agent or 'None'}")
print(f"Privacy Level: {level.value}")
print(f"Routing: {'LOCAL (colossus)' if use_local else 'EXTERNAL (OpenRouter)'}")
print(f"Reason: {details.get('reason')}")
if details.get('keyword_matches'):
print(f"Keywords: {details['keyword_matches']}")
if details.get('pattern_matches'):
print(f"Patterns: {details['pattern_matches']}")
print()
|