""" intelligence_pipeline.py Updated to integrate: - safety_classifier (request + output gating) - slang detection - transformer threat scoring - optional OpenAI LLM fusion - translation + explanation logic """ import json import os from typing import Dict, List, Optional from prompt_engine import PromptEngine from model_inference import ThreatModel from safety_classifier import ( safety_check, safety_check_output, detect_dangerous_intent, ) try: from openai import OpenAI # OpenAI Python SDK >= 1.0 except Exception: OpenAI = None # --------------------------------------------------------------------------- # Global initialization # --------------------------------------------------------------------------- engine = PromptEngine() SLANG_LEXICON_PATH = os.path.join(os.path.dirname(__file__), "slang_lexicon.json") if os.path.exists(SLANG_LEXICON_PATH): with open(SLANG_LEXICON_PATH, "r", encoding="utf-8") as f: SLANG_LEXICON: Dict[str, str] = json.load(f) else: SLANG_LEXICON = {} # Transformers model MODEL_PATH = os.getenv("THREAT_MODEL_PATH", "bert-base-chinese") try: threat_model = ThreatModel(MODEL_PATH) except Exception: threat_model = None # Optional LLM client if OpenAI is not None and os.getenv("OPENAI_API_KEY"): try: llm_client = OpenAI() except Exception: llm_client = None else: llm_client = None # --------------------------------------------------------------------------- # Utility: slang detection # --------------------------------------------------------------------------- def detect_slang(text: str) -> Dict[str, str]: hits = {} for term, meaning in SLANG_LEXICON.items(): if term in text: hits[term] = meaning return hits # --------------------------------------------------------------------------- # Transformer risk scoring # --------------------------------------------------------------------------- def transformer_risk(text: str) -> Dict[str, float]: if threat_model is None: return {"risk_score": 0.0, "max_prob": 0.0} probs: List[float] = threat_model.predict_proba(text) if not probs: return {"risk_score": 0.0, "max_prob": 0.0} max_prob = max(probs) bucket = max(1, int(round(max_prob * 5))) if max_prob > 0 else 0 return {"risk_score": float(bucket), "max_prob": float(max_prob)} # --------------------------------------------------------------------------- # LLM fusion logic # --------------------------------------------------------------------------- def llm_explain_threat(text: str, slang_hits: Dict[str, str], risk: Dict[str, float]) -> Optional[str]: if llm_client is None: return None slang_str = ", ".join(f"{k}: {v}" for k, v in slang_hits.items()) or "none" prompt = f""" Analyze APJ underground market content with cultural nuance. TEXT: {text} Signals: - Risk bucket (1–5): {risk.get('risk_score', 0)} - Classifier max probability: {risk.get('max_prob', 0.0):.3f} - Detected slang: {slang_str} Return: - threat summary - role of slang/idioms - risk interpretation """ try: response = llm_client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You are a cyber threat intelligence analyst."}, {"role": "user", "content": prompt}, ], temperature=0.2, ) return response.choices[0].message.content except Exception: return None def llm_translate_explain(text: str) -> Optional[str]: if llm_client is None: return None prompt = engine.translate_explain(text) try: response = llm_client.chat.completions.create( model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You are a bilingual Mandarin/Cantonese → English interpreter with cybercrime domain knowledge."}, {"role": "user", "content": prompt}, ], temperature=0.2, ) return response.choices[0].message.content except Exception: return None # --------------------------------------------------------------------------- # PUBLIC: Threat analysis # --------------------------------------------------------------------------- def analyze_threat(text: str) -> str: text = (text or "").strip() # ---------------- Safety gate (incoming request) ---------------- safety = safety_check(text) if safety["blocked"]: return safety["reason"] # ---------------- Slang detection ---------------- slang = detect_slang(text) # ---------------- Transformer risk scoring ---------------- risk = transformer_risk(text) # ---------------- LLM fusion ---------------- llm_result = llm_explain_threat(text, slang, risk) # ---------------- Build output ---------------- out = [] if slang: out.append("### Slang / Idioms Detected") for k, v in slang.items(): out.append(f"- {k}: {v}") out.append("") out.append(f"### Risk Score: {risk['risk_score']} (1–5)") out.append(f"Classifier max probability: {risk['max_prob']:.3f}\n") if llm_result: # Safety check on model output out_safety = safety_check_output(llm_result) if out_safety["blocked"]: return "⚠️ LLM output blocked: " + out_safety["reason"] out.append("### LLM Interpretation\n" + llm_result) else: out.append("### LLM Interpretation Unavailable") out.append("Only transformer-based scoring applied.") return "\n".join(out) # --------------------------------------------------------------------------- # PUBLIC: Translation mode # --------------------------------------------------------------------------- def translate_and_explain(text: str) -> str: text = (text or "").strip() safety = safety_check(text) if safety["blocked"]: return safety["reason"] llm_result = llm_translate_explain(text) if llm_result: out_safety = safety_check_output(llm_result) if out_safety["blocked"]: return "⚠️ LLM translation blocked: " + out_safety["reason"] return llm_result return f"LLM not configured.\nRaw text:\n{text}"