ThtratLandscapeChat / intelligence_pipeline.py
S-Dreamer's picture
Create intelligence_pipeline.py
f64bab3 verified
raw
history blame
6.38 kB
"""
intelligence_pipeline.py
Updated to integrate:
- safety_classifier (request + output gating)
- slang detection
- transformer threat scoring
- optional OpenAI LLM fusion
- translation + explanation logic
"""
import json
import os
from typing import Dict, List, Optional
from prompt_engine import PromptEngine
from model_inference import ThreatModel
from safety_classifier import (
safety_check,
safety_check_output,
detect_dangerous_intent,
)
try:
from openai import OpenAI # OpenAI Python SDK >= 1.0
except Exception:
OpenAI = None
# ---------------------------------------------------------------------------
# Global initialization
# ---------------------------------------------------------------------------
engine = PromptEngine()
SLANG_LEXICON_PATH = os.path.join(os.path.dirname(__file__), "slang_lexicon.json")
if os.path.exists(SLANG_LEXICON_PATH):
with open(SLANG_LEXICON_PATH, "r", encoding="utf-8") as f:
SLANG_LEXICON: Dict[str, str] = json.load(f)
else:
SLANG_LEXICON = {}
# Transformers model
MODEL_PATH = os.getenv("THREAT_MODEL_PATH", "bert-base-chinese")
try:
threat_model = ThreatModel(MODEL_PATH)
except Exception:
threat_model = None
# Optional LLM client
if OpenAI is not None and os.getenv("OPENAI_API_KEY"):
try:
llm_client = OpenAI()
except Exception:
llm_client = None
else:
llm_client = None
# ---------------------------------------------------------------------------
# Utility: slang detection
# ---------------------------------------------------------------------------
def detect_slang(text: str) -> Dict[str, str]:
hits = {}
for term, meaning in SLANG_LEXICON.items():
if term in text:
hits[term] = meaning
return hits
# ---------------------------------------------------------------------------
# Transformer risk scoring
# ---------------------------------------------------------------------------
def transformer_risk(text: str) -> Dict[str, float]:
if threat_model is None:
return {"risk_score": 0.0, "max_prob": 0.0}
probs: List[float] = threat_model.predict_proba(text)
if not probs:
return {"risk_score": 0.0, "max_prob": 0.0}
max_prob = max(probs)
bucket = max(1, int(round(max_prob * 5))) if max_prob > 0 else 0
return {"risk_score": float(bucket), "max_prob": float(max_prob)}
# ---------------------------------------------------------------------------
# LLM fusion logic
# ---------------------------------------------------------------------------
def llm_explain_threat(text: str, slang_hits: Dict[str, str], risk: Dict[str, float]) -> Optional[str]:
if llm_client is None:
return None
slang_str = ", ".join(f"{k}: {v}" for k, v in slang_hits.items()) or "none"
prompt = f"""
Analyze APJ underground market content with cultural nuance.
TEXT:
{text}
Signals:
- Risk bucket (1–5): {risk.get('risk_score', 0)}
- Classifier max probability: {risk.get('max_prob', 0.0):.3f}
- Detected slang: {slang_str}
Return:
- threat summary
- role of slang/idioms
- risk interpretation
"""
try:
response = llm_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a cyber threat intelligence analyst."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
return response.choices[0].message.content
except Exception:
return None
def llm_translate_explain(text: str) -> Optional[str]:
if llm_client is None:
return None
prompt = engine.translate_explain(text)
try:
response = llm_client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You are a bilingual Mandarin/Cantonese → English interpreter with cybercrime domain knowledge."},
{"role": "user", "content": prompt},
],
temperature=0.2,
)
return response.choices[0].message.content
except Exception:
return None
# ---------------------------------------------------------------------------
# PUBLIC: Threat analysis
# ---------------------------------------------------------------------------
def analyze_threat(text: str) -> str:
text = (text or "").strip()
# ---------------- Safety gate (incoming request) ----------------
safety = safety_check(text)
if safety["blocked"]:
return safety["reason"]
# ---------------- Slang detection ----------------
slang = detect_slang(text)
# ---------------- Transformer risk scoring ----------------
risk = transformer_risk(text)
# ---------------- LLM fusion ----------------
llm_result = llm_explain_threat(text, slang, risk)
# ---------------- Build output ----------------
out = []
if slang:
out.append("### Slang / Idioms Detected")
for k, v in slang.items():
out.append(f"- {k}: {v}")
out.append("")
out.append(f"### Risk Score: {risk['risk_score']} (1–5)")
out.append(f"Classifier max probability: {risk['max_prob']:.3f}\n")
if llm_result:
# Safety check on model output
out_safety = safety_check_output(llm_result)
if out_safety["blocked"]:
return "⚠️ LLM output blocked: " + out_safety["reason"]
out.append("### LLM Interpretation\n" + llm_result)
else:
out.append("### LLM Interpretation Unavailable")
out.append("Only transformer-based scoring applied.")
return "\n".join(out)
# ---------------------------------------------------------------------------
# PUBLIC: Translation mode
# ---------------------------------------------------------------------------
def translate_and_explain(text: str) -> str:
text = (text or "").strip()
safety = safety_check(text)
if safety["blocked"]:
return safety["reason"]
llm_result = llm_translate_explain(text)
if llm_result:
out_safety = safety_check_output(llm_result)
if out_safety["blocked"]:
return "⚠️ LLM translation blocked: " + out_safety["reason"]
return llm_result
return f"LLM not configured.\nRaw text:\n{text}"