Spaces:
Sleeping
Sleeping
| """ | |
| intelligence_pipeline.py | |
| Updated to integrate: | |
| - safety_classifier (request + output gating) | |
| - slang detection | |
| - transformer threat scoring | |
| - optional OpenAI LLM fusion | |
| - translation + explanation logic | |
| """ | |
| import json | |
| import os | |
| from typing import Dict, List, Optional | |
| from prompt_engine import PromptEngine | |
| from model_inference import ThreatModel | |
| from safety_classifier import ( | |
| safety_check, | |
| safety_check_output, | |
| detect_dangerous_intent, | |
| ) | |
| try: | |
| from openai import OpenAI # OpenAI Python SDK >= 1.0 | |
| except Exception: | |
| OpenAI = None | |
| # --------------------------------------------------------------------------- | |
| # Global initialization | |
| # --------------------------------------------------------------------------- | |
| engine = PromptEngine() | |
| SLANG_LEXICON_PATH = os.path.join(os.path.dirname(__file__), "slang_lexicon.json") | |
| if os.path.exists(SLANG_LEXICON_PATH): | |
| with open(SLANG_LEXICON_PATH, "r", encoding="utf-8") as f: | |
| SLANG_LEXICON: Dict[str, str] = json.load(f) | |
| else: | |
| SLANG_LEXICON = {} | |
| # Transformers model | |
| MODEL_PATH = os.getenv("THREAT_MODEL_PATH", "bert-base-chinese") | |
| try: | |
| threat_model = ThreatModel(MODEL_PATH) | |
| except Exception: | |
| threat_model = None | |
| # Optional LLM client | |
| if OpenAI is not None and os.getenv("OPENAI_API_KEY"): | |
| try: | |
| llm_client = OpenAI() | |
| except Exception: | |
| llm_client = None | |
| else: | |
| llm_client = None | |
| # --------------------------------------------------------------------------- | |
| # Utility: slang detection | |
| # --------------------------------------------------------------------------- | |
| def detect_slang(text: str) -> Dict[str, str]: | |
| hits = {} | |
| for term, meaning in SLANG_LEXICON.items(): | |
| if term in text: | |
| hits[term] = meaning | |
| return hits | |
| # --------------------------------------------------------------------------- | |
| # Transformer risk scoring | |
| # --------------------------------------------------------------------------- | |
| def transformer_risk(text: str) -> Dict[str, float]: | |
| if threat_model is None: | |
| return {"risk_score": 0.0, "max_prob": 0.0} | |
| probs: List[float] = threat_model.predict_proba(text) | |
| if not probs: | |
| return {"risk_score": 0.0, "max_prob": 0.0} | |
| max_prob = max(probs) | |
| bucket = max(1, int(round(max_prob * 5))) if max_prob > 0 else 0 | |
| return {"risk_score": float(bucket), "max_prob": float(max_prob)} | |
| # --------------------------------------------------------------------------- | |
| # LLM fusion logic | |
| # --------------------------------------------------------------------------- | |
| def llm_explain_threat(text: str, slang_hits: Dict[str, str], risk: Dict[str, float]) -> Optional[str]: | |
| if llm_client is None: | |
| return None | |
| slang_str = ", ".join(f"{k}: {v}" for k, v in slang_hits.items()) or "none" | |
| prompt = f""" | |
| Analyze APJ underground market content with cultural nuance. | |
| TEXT: | |
| {text} | |
| Signals: | |
| - Risk bucket (1–5): {risk.get('risk_score', 0)} | |
| - Classifier max probability: {risk.get('max_prob', 0.0):.3f} | |
| - Detected slang: {slang_str} | |
| Return: | |
| - threat summary | |
| - role of slang/idioms | |
| - risk interpretation | |
| """ | |
| try: | |
| response = llm_client.chat.completions.create( | |
| model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), | |
| messages=[ | |
| {"role": "system", "content": "You are a cyber threat intelligence analyst."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.2, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception: | |
| return None | |
| def llm_translate_explain(text: str) -> Optional[str]: | |
| if llm_client is None: | |
| return None | |
| prompt = engine.translate_explain(text) | |
| try: | |
| response = llm_client.chat.completions.create( | |
| model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), | |
| messages=[ | |
| {"role": "system", "content": "You are a bilingual Mandarin/Cantonese → English interpreter with cybercrime domain knowledge."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.2, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception: | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # PUBLIC: Threat analysis | |
| # --------------------------------------------------------------------------- | |
| def analyze_threat(text: str) -> str: | |
| text = (text or "").strip() | |
| # ---------------- Safety gate (incoming request) ---------------- | |
| safety = safety_check(text) | |
| if safety["blocked"]: | |
| return safety["reason"] | |
| # ---------------- Slang detection ---------------- | |
| slang = detect_slang(text) | |
| # ---------------- Transformer risk scoring ---------------- | |
| risk = transformer_risk(text) | |
| # ---------------- LLM fusion ---------------- | |
| llm_result = llm_explain_threat(text, slang, risk) | |
| # ---------------- Build output ---------------- | |
| out = [] | |
| if slang: | |
| out.append("### Slang / Idioms Detected") | |
| for k, v in slang.items(): | |
| out.append(f"- {k}: {v}") | |
| out.append("") | |
| out.append(f"### Risk Score: {risk['risk_score']} (1–5)") | |
| out.append(f"Classifier max probability: {risk['max_prob']:.3f}\n") | |
| if llm_result: | |
| # Safety check on model output | |
| out_safety = safety_check_output(llm_result) | |
| if out_safety["blocked"]: | |
| return "⚠️ LLM output blocked: " + out_safety["reason"] | |
| out.append("### LLM Interpretation\n" + llm_result) | |
| else: | |
| out.append("### LLM Interpretation Unavailable") | |
| out.append("Only transformer-based scoring applied.") | |
| return "\n".join(out) | |
| # --------------------------------------------------------------------------- | |
| # PUBLIC: Translation mode | |
| # --------------------------------------------------------------------------- | |
| def translate_and_explain(text: str) -> str: | |
| text = (text or "").strip() | |
| safety = safety_check(text) | |
| if safety["blocked"]: | |
| return safety["reason"] | |
| llm_result = llm_translate_explain(text) | |
| if llm_result: | |
| out_safety = safety_check_output(llm_result) | |
| if out_safety["blocked"]: | |
| return "⚠️ LLM translation blocked: " + out_safety["reason"] | |
| return llm_result | |
| return f"LLM not configured.\nRaw text:\n{text}" |