File size: 6,384 Bytes
f64bab3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""
intelligence_pipeline.py

Updated to integrate:
- safety_classifier (request + output gating)
- slang detection
- transformer threat scoring
- optional OpenAI LLM fusion
- translation + explanation logic
"""

import json
import os
from typing import Dict, List, Optional

from prompt_engine import PromptEngine
from model_inference import ThreatModel
from safety_classifier import (
    safety_check,
    safety_check_output,
    detect_dangerous_intent,
)

try:
    from openai import OpenAI  # OpenAI Python SDK >= 1.0
except Exception:
    OpenAI = None


# ---------------------------------------------------------------------------
# Global initialization
# ---------------------------------------------------------------------------

engine = PromptEngine()

SLANG_LEXICON_PATH = os.path.join(os.path.dirname(__file__), "slang_lexicon.json")
if os.path.exists(SLANG_LEXICON_PATH):
    with open(SLANG_LEXICON_PATH, "r", encoding="utf-8") as f:
        SLANG_LEXICON: Dict[str, str] = json.load(f)
else:
    SLANG_LEXICON = {}

# Transformers model
MODEL_PATH = os.getenv("THREAT_MODEL_PATH", "bert-base-chinese")

try:
    threat_model = ThreatModel(MODEL_PATH)
except Exception:
    threat_model = None

# Optional LLM client
if OpenAI is not None and os.getenv("OPENAI_API_KEY"):
    try:
        llm_client = OpenAI()
    except Exception:
        llm_client = None
else:
    llm_client = None


# ---------------------------------------------------------------------------
# Utility: slang detection
# ---------------------------------------------------------------------------

def detect_slang(text: str) -> Dict[str, str]:
    hits = {}
    for term, meaning in SLANG_LEXICON.items():
        if term in text:
            hits[term] = meaning
    return hits


# ---------------------------------------------------------------------------
# Transformer risk scoring
# ---------------------------------------------------------------------------

def transformer_risk(text: str) -> Dict[str, float]:
    if threat_model is None:
        return {"risk_score": 0.0, "max_prob": 0.0}

    probs: List[float] = threat_model.predict_proba(text)
    if not probs:
        return {"risk_score": 0.0, "max_prob": 0.0}

    max_prob = max(probs)
    bucket = max(1, int(round(max_prob * 5))) if max_prob > 0 else 0

    return {"risk_score": float(bucket), "max_prob": float(max_prob)}


# ---------------------------------------------------------------------------
# LLM fusion logic
# ---------------------------------------------------------------------------

def llm_explain_threat(text: str, slang_hits: Dict[str, str], risk: Dict[str, float]) -> Optional[str]:
    if llm_client is None:
        return None

    slang_str = ", ".join(f"{k}: {v}" for k, v in slang_hits.items()) or "none"
    prompt = f"""
Analyze APJ underground market content with cultural nuance.

TEXT:
{text}

Signals:
- Risk bucket (1–5): {risk.get('risk_score', 0)}
- Classifier max probability: {risk.get('max_prob', 0.0):.3f}
- Detected slang: {slang_str}

Return:
- threat summary
- role of slang/idioms
- risk interpretation
"""

    try:
        response = llm_client.chat.completions.create(
            model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
            messages=[
                {"role": "system", "content": "You are a cyber threat intelligence analyst."},
                {"role": "user", "content": prompt},
            ],
            temperature=0.2,
        )
        return response.choices[0].message.content
    except Exception:
        return None


def llm_translate_explain(text: str) -> Optional[str]:
    if llm_client is None:
        return None

    prompt = engine.translate_explain(text)

    try:
        response = llm_client.chat.completions.create(
            model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
            messages=[
                {"role": "system", "content": "You are a bilingual Mandarin/Cantonese → English interpreter with cybercrime domain knowledge."},
                {"role": "user", "content": prompt},
            ],
            temperature=0.2,
        )
        return response.choices[0].message.content
    except Exception:
        return None


# ---------------------------------------------------------------------------
# PUBLIC: Threat analysis
# ---------------------------------------------------------------------------

def analyze_threat(text: str) -> str:
    text = (text or "").strip()

    # ---------------- Safety gate (incoming request) ----------------
    safety = safety_check(text)
    if safety["blocked"]:
        return safety["reason"]

    # ---------------- Slang detection ----------------
    slang = detect_slang(text)

    # ---------------- Transformer risk scoring ----------------
    risk = transformer_risk(text)

    # ---------------- LLM fusion ----------------
    llm_result = llm_explain_threat(text, slang, risk)

    # ---------------- Build output ----------------
    out = []

    if slang:
        out.append("### Slang / Idioms Detected")
        for k, v in slang.items():
            out.append(f"- {k}: {v}")
        out.append("")

    out.append(f"### Risk Score: {risk['risk_score']} (1–5)")
    out.append(f"Classifier max probability: {risk['max_prob']:.3f}\n")

    if llm_result:
        # Safety check on model output
        out_safety = safety_check_output(llm_result)
        if out_safety["blocked"]:
            return "⚠️ LLM output blocked: " + out_safety["reason"]

        out.append("### LLM Interpretation\n" + llm_result)
    else:
        out.append("### LLM Interpretation Unavailable")
        out.append("Only transformer-based scoring applied.")

    return "\n".join(out)


# ---------------------------------------------------------------------------
# PUBLIC: Translation mode
# ---------------------------------------------------------------------------

def translate_and_explain(text: str) -> str:
    text = (text or "").strip()

    safety = safety_check(text)
    if safety["blocked"]:
        return safety["reason"]

    llm_result = llm_translate_explain(text)
    if llm_result:
        out_safety = safety_check_output(llm_result)
        if out_safety["blocked"]:
            return "⚠️ LLM translation blocked: " + out_safety["reason"]
        return llm_result

    return f"LLM not configured.\nRaw text:\n{text}"