saap-plattform / backend /privacy_detector.py
Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
raw
history blame
12 kB
"""
Privacy Detector for SAAP Multi-Agent System
Detects sensitive data to route to local colossus instead of external OpenRouter
"""
import re
import logging
from typing import Dict, List, Tuple
from enum import Enum
logger = logging.getLogger(__name__)
class PrivacyLevel(Enum):
"""Privacy classification levels"""
PUBLIC = "public" # Safe for external processing
INTERNAL = "internal" # Prefer local but not critical
CONFIDENTIAL = "confidential" # Should be local
PRIVATE = "private" # MUST be local (medical, financial, personal)
class PrivacyDetector:
"""
Detects sensitive data in user messages to ensure privacy-compliant routing
Detection Methods:
1. Keyword-based (medical, financial, personal data keywords)
2. Pattern-based (credit cards, SSN, IBAN, etc.)
3. Agent-based rules (medical agent = always private)
"""
def __init__(self):
# Sensitive keyword categories
self.sensitive_keywords = {
"medical": [
"patient", "patienten", "diagnosis", "diagnose", "treatment", "behandlung",
"medication", "medikament", "symptom", "krankheit", "disease", "arzt",
"doctor", "hospital", "krankenhaus", "gesundheit", "health", "medizin",
"medicine", "therapie", "therapy", "blut", "blood", "operation"
],
"financial": [
# Only TRULY sensitive financial data (account numbers, cards, passwords)
# General financial advice keywords removed (investment, portfolio, sparen, etc.)
"account number", "kontonummer", "password", "passwort",
"credit card", "kreditkarte", "iban", "bic",
"pin", "cvv", "security code", "sicherheitscode",
"salary", "gehalt", # Personal income data
"tax id", "steuernummer" # Personal tax data
],
"personal": [
"social security", "sozialversicherung", "passport", "reisepass",
"driver license", "führerschein", "birthday", "geburtstag", "geburtsdatum",
"address", "adresse", "phone", "telefon", "email", "personalausweis",
"id card", "tax", "steuer", "insurance", "versicherung"
],
"legal": [
"contract", "vertrag", "confidential", "vertraulich", "proprietary",
"nda", "geheimhaltung", "lawsuit", "klage", "court", "gericht",
"lawyer", "anwalt", "legal", "rechtlich"
],
"security": [
"secret", "geheim", "private key", "token", "api key", "credentials",
"zugangsdaten", "authentication", "authentifizierung"
]
}
# Sensitive data patterns (regex)
self.sensitive_patterns = [
# Credit card numbers
(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "credit_card"),
# SSN (US)
(r"\b\d{3}-\d{2}-\d{4}\b", "ssn"),
# IBAN
(r"\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b", "iban"),
# Dates (potential birthdays)
(r"\b\d{2}[./]\d{2}[./]\d{4}\b", "date"),
# Email addresses
(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email"),
# Phone numbers (German format)
(r"\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b", "phone"),
# API keys/tokens (long alphanumeric strings)
(r"\b[A-Za-z0-9]{32,}\b", "api_key")
]
# Agent-specific privacy rules
# Only MANDATORY privacy rules here (content-based detection is preferred)
self.agent_privacy_rules = {
"lara_alesi": PrivacyLevel.PRIVATE, # Medical - ALWAYS private (health data)
"theo_alesi": PrivacyLevel.INTERNAL, # Financial - depends on content
"justus_alesi": PrivacyLevel.INTERNAL, # Legal - depends on content
"jane_alesi": PrivacyLevel.INTERNAL, # Coordinator - depends on content
"john_alesi": PrivacyLevel.INTERNAL, # Development - usually safe
"leon_alesi": PrivacyLevel.INTERNAL, # System - usually safe
"luna_alesi": PrivacyLevel.INTERNAL # Coaching - usually safe
}
def detect_privacy_level(
self,
message: str,
agent_id: str = None,
user_privacy_flag: str = None
) -> Tuple[PrivacyLevel, Dict]:
"""
Detect privacy level of message
Args:
message: User message to analyze
agent_id: Agent that will process this (optional)
user_privacy_flag: User-specified privacy level (optional)
Returns:
Tuple of (PrivacyLevel, detection_details)
"""
# User override takes precedence
if user_privacy_flag:
level = self._parse_user_privacy_flag(user_privacy_flag)
return level, {"reason": "user_override", "user_flag": user_privacy_flag}
# Check agent-specific rules first
if agent_id and agent_id in self.agent_privacy_rules:
agent_level = self.agent_privacy_rules[agent_id]
if agent_level == PrivacyLevel.PRIVATE:
return agent_level, {
"reason": "agent_rule",
"agent": agent_id,
"rule": "always_private"
}
# Keyword detection
keyword_matches = self._detect_keywords(message)
# Pattern detection
pattern_matches = self._detect_patterns(message)
# Combine detections
total_detections = len(keyword_matches) + len(pattern_matches)
# Determine privacy level based on detections
details = {
"keyword_matches": keyword_matches,
"pattern_matches": pattern_matches,
"total_detections": total_detections
}
# Classification logic
if pattern_matches: # Any pattern match = high sensitivity
if any(cat in ["credit_card", "ssn", "iban"] for pat, cat in pattern_matches):
return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_pattern"}
if keyword_matches:
categories = set(cat for cat, _ in keyword_matches)
# Medical or financial keywords = PRIVATE
if "medical" in categories or "financial" in categories:
return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_keywords"}
# Personal or legal = CONFIDENTIAL
if "personal" in categories or "legal" in categories:
return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "confidential_keywords"}
# Security keywords = CONFIDENTIAL
if "security" in categories:
return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "security_keywords"}
# Apply agent rule if no strong detection
if agent_id and agent_id in self.agent_privacy_rules:
return self.agent_privacy_rules[agent_id], {
**details,
"reason": "agent_default",
"agent": agent_id
}
# Default: PUBLIC (safe for external processing)
return PrivacyLevel.PUBLIC, {**details, "reason": "no_sensitive_data"}
def _detect_keywords(self, message: str) -> List[Tuple[str, str]]:
"""Detect sensitive keywords in message using word boundaries"""
message_lower = message.lower()
matches = []
for category, keywords in self.sensitive_keywords.items():
for keyword in keywords:
# Use word boundaries to avoid false positives
# "health" in "wealth" won't match
pattern = r'\b' + re.escape(keyword) + r'\b'
if re.search(pattern, message_lower):
matches.append((category, keyword))
return matches
def _detect_patterns(self, message: str) -> List[Tuple[str, str]]:
"""Detect sensitive patterns in message"""
matches = []
for pattern, category in self.sensitive_patterns:
if re.search(pattern, message):
matches.append((pattern, category))
return matches
def _parse_user_privacy_flag(self, flag: str) -> PrivacyLevel:
"""Parse user-specified privacy flag"""
flag_lower = flag.lower()
if flag_lower in ["private", "high", "strict"]:
return PrivacyLevel.PRIVATE
elif flag_lower in ["confidential", "medium"]:
return PrivacyLevel.CONFIDENTIAL
elif flag_lower in ["internal", "low"]:
return PrivacyLevel.INTERNAL
else:
return PrivacyLevel.PUBLIC
def should_use_local_provider(self, privacy_level: PrivacyLevel, mode: str = "balanced") -> bool:
"""
Determine if local provider should be used based on privacy level and mode
Args:
privacy_level: Detected privacy level
mode: Privacy mode (strict, balanced, performance)
Returns:
True if should use local provider (colossus), False if external OK (OpenRouter)
"""
if mode == "strict":
# Strict: Everything goes local
return True
elif mode == "balanced":
# Balanced: Private and Confidential go local
return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]
elif mode == "performance":
# Performance: Only explicitly PRIVATE goes local
return privacy_level == PrivacyLevel.PRIVATE
else:
# Default to balanced
return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]
# Singleton instance
privacy_detector = PrivacyDetector()
# Convenience functions
def detect_privacy_level(message: str, agent_id: str = None, user_flag: str = None) -> Tuple[PrivacyLevel, Dict]:
"""Convenience function to detect privacy level"""
return privacy_detector.detect_privacy_level(message, agent_id, user_flag)
def should_use_local(message: str, agent_id: str = None, mode: str = "balanced") -> bool:
"""Convenience function to check if local provider should be used"""
level, details = privacy_detector.detect_privacy_level(message, agent_id)
use_local = privacy_detector.should_use_local_provider(level, mode)
logger.info(f"Privacy check: {level.value}{'LOCAL' if use_local else 'EXTERNAL'} ({details.get('reason', 'unknown')})")
return use_local
if __name__ == "__main__":
# Demo privacy detection
test_cases = [
("What is Python?", "john_alesi"),
("My patient has symptoms of diabetes", "lara_alesi"),
("Analyze my bank account: DE89370400440532013000", "theo_alesi"),
("Review this confidential contract", "justus_alesi"),
("How to optimize this code?", "john_alesi"),
("My credit card number is 4532-1234-5678-9010", None)
]
print("🔒 Privacy Detection Demo\n")
for message, agent in test_cases:
level, details = detect_privacy_level(message, agent)
use_local = privacy_detector.should_use_local_provider(level, mode="balanced")
print(f"Message: '{message[:50]}...'")
print(f"Agent: {agent or 'None'}")
print(f"Privacy Level: {level.value}")
print(f"Routing: {'LOCAL (colossus)' if use_local else 'EXTERNAL (OpenRouter)'}")
print(f"Reason: {details.get('reason')}")
if details.get('keyword_matches'):
print(f"Keywords: {details['keyword_matches']}")
if details.get('pattern_matches'):
print(f"Patterns: {details['pattern_matches']}")
print()