Spaces:
Sleeping
Sleeping
File size: 6,384 Bytes
f64bab3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
"""
intelligence_pipeline.py
Updated to integrate:
- safety_classifier (request + output gating)
- slang detection
- transformer threat scoring
- optional OpenAI LLM fusion
- translation + explanation logic
"""
import json
import os
from typing import Dict, List, Optional
from prompt_engine import PromptEngine
from model_inference import ThreatModel
from safety_classifier import (
safety_check,
safety_check_output,
detect_dangerous_intent,
)
try:
from openai import OpenAI # OpenAI Python SDK >= 1.0
except Exception:
OpenAI = None
# ---------------------------------------------------------------------------
# Global initialization
# ---------------------------------------------------------------------------
engine = PromptEngine()
SLANG_LEXICON_PATH = os.path.join(os.path.dirname(__file__), "slang_lexicon.json")
if os.path.exists(SLANG_LEXICON_PATH):
with open(SLANG_LEXICON_PATH, "r", encoding="utf-8") as f:
SLANG_LEXICON: Dict[str, str] = json.load(f)
else:
SLANG_LEXICON = {}
# Transformers model
MODEL_PATH = os.getenv("THREAT_MODEL_PATH", "bert-base-chinese")
try:
threat_model = ThreatModel(MODEL_PATH)
except Exception:
threat_model = None
# Optional LLM client
if OpenAI is not None and os.getenv("OPENAI_API_KEY"):
try:
llm_client = OpenAI()
except Exception:
llm_client = None
else:
llm_client = None
# ---------------------------------------------------------------------------
# Utility: slang detection
# ---------------------------------------------------------------------------
def detect_slang(text: str) -> Dict[str, str]:
hits = {}
for term, meaning in SLANG_LEXICON.items():
if term in text:
hits[term] = meaning
return hits
# ---------------------------------------------------------------------------
# Transformer risk scoring
# ---------------------------------------------------------------------------
def transformer_risk(text: str) -> Dict[str, float]:
if threat_model is None:
return {"risk_score": 0.0, "max_prob": 0.0}
probs: List[float] = threat_model.predict_proba(text)
if not probs:
return {"risk_score": 0.0, "max_prob": 0.0}
max_prob = max(probs)
bucket = max(1, int(round(max_prob * 5))) if max_prob > 0 else 0
return {"risk_score": float(bucket), "max_prob": float(max_prob)}
# ---------------------------------------------------------------------------
# LLM fusion logic
# ---------------------------------------------------------------------------
def llm_explain_threat(text: str, slang_hits: Dict[str, str], risk: Dict[str, float]) -> Optional[str]:
if llm_client is None:
return None
slang_str = ", ".join(f"{k}: {v}" for k, v in slang_hits.items()) or "none"
prompt = f"""
Analyze APJ underground market content with cultural nuance.
TEXT:
{text}
Signals:
- Risk bucket (1–5): {risk.get('risk_score', 0)}
- Classifier max probability: {risk.get('max_prob', 0.0):.3f}
- Detected slang: {slang_str}
Return:
- threat summary
- role of slang/idioms
- risk interpretation
"""
try:
response = llm_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a cyber threat intelligence analyst."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
return response.choices[0].message.content
except Exception:
return None
def llm_translate_explain(text: str) -> Optional[str]:
if llm_client is None:
return None
prompt = engine.translate_explain(text)
try:
response = llm_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a bilingual Mandarin/Cantonese → English interpreter with cybercrime domain knowledge."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
return response.choices[0].message.content
except Exception:
return None
# ---------------------------------------------------------------------------
# PUBLIC: Threat analysis
# ---------------------------------------------------------------------------
def analyze_threat(text: str) -> str:
text = (text or "").strip()
# ---------------- Safety gate (incoming request) ----------------
safety = safety_check(text)
if safety["blocked"]:
return safety["reason"]
# ---------------- Slang detection ----------------
slang = detect_slang(text)
# ---------------- Transformer risk scoring ----------------
risk = transformer_risk(text)
# ---------------- LLM fusion ----------------
llm_result = llm_explain_threat(text, slang, risk)
# ---------------- Build output ----------------
out = []
if slang:
out.append("### Slang / Idioms Detected")
for k, v in slang.items():
out.append(f"- {k}: {v}")
out.append("")
out.append(f"### Risk Score: {risk['risk_score']} (1–5)")
out.append(f"Classifier max probability: {risk['max_prob']:.3f}\n")
if llm_result:
# Safety check on model output
out_safety = safety_check_output(llm_result)
if out_safety["blocked"]:
return "⚠️ LLM output blocked: " + out_safety["reason"]
out.append("### LLM Interpretation\n" + llm_result)
else:
out.append("### LLM Interpretation Unavailable")
out.append("Only transformer-based scoring applied.")
return "\n".join(out)
# ---------------------------------------------------------------------------
# PUBLIC: Translation mode
# ---------------------------------------------------------------------------
def translate_and_explain(text: str) -> str:
text = (text or "").strip()
safety = safety_check(text)
if safety["blocked"]:
return safety["reason"]
llm_result = llm_translate_explain(text)
if llm_result:
out_safety = safety_check_output(llm_result)
if out_safety["blocked"]:
return "⚠️ LLM translation blocked: " + out_safety["reason"]
return llm_result
return f"LLM not configured.\nRaw text:\n{text}" |