Joblib
ynuozhang
add inference
470021d
import os
import json
import joblib
import optuna
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, Any, Tuple, Optional
from datasets import load_from_disk, DatasetDict
from sklearn.metrics import (
f1_score, roc_auc_score, average_precision_score,
precision_recall_curve, roc_curve
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC, LinearSVC
from sklearn.calibration import CalibratedClassifierCV
import torch
import time
import xgboost as xgb
from lightning.pytorch import seed_everything
import cupy as cp
from cuml.svm import SVC as cuSVC
from cuml.linear_model import LogisticRegression as cuLogReg
seed_everything(1986)
def to_gpu(X: np.ndarray):
if isinstance(X, cp.ndarray):
return X
return cp.asarray(X, dtype=cp.float32)
def to_cpu(x):
if isinstance(x, cp.ndarray):
return cp.asnumpy(x)
return np.asarray(x)
@dataclass
class SplitData:
X_train: np.ndarray
y_train: np.ndarray
seq_train: Optional[np.ndarray]
X_val: np.ndarray
y_val: np.ndarray
seq_val: Optional[np.ndarray]
def _stack_embeddings(col) -> np.ndarray:
arr = np.asarray(col, dtype=np.float32)
if arr.ndim != 2:
arr = np.stack(col).astype(np.float32)
return arr
def load_split_data(dataset_path: str) -> SplitData:
ds = load_from_disk(dataset_path)
# Case A: DatasetDict with train/val
if isinstance(ds, DatasetDict) and "train" in ds and "val" in ds:
train_ds, val_ds = ds["train"], ds["val"]
else:
# Case B: Single dataset with "split" column
if "split" not in ds.column_names:
raise ValueError(
"Dataset must be a DatasetDict(train/val) or have a 'split' column."
)
train_ds = ds.filter(lambda x: x["split"] == "train")
val_ds = ds.filter(lambda x: x["split"] == "val")
for required in ["embedding", "label"]:
if required not in train_ds.column_names:
raise ValueError(f"Missing column '{required}' in train split.")
if required not in val_ds.column_names:
raise ValueError(f"Missing column '{required}' in val split.")
X_train = _stack_embeddings(train_ds["embedding"])
y_train = np.asarray(train_ds["label"], dtype=np.int64)
X_val = _stack_embeddings(val_ds["embedding"])
y_val = np.asarray(val_ds["label"], dtype=np.int64)
seq_train = None
seq_val = None
if "sequence" in train_ds.column_names:
seq_train = np.asarray(train_ds["sequence"])
if "sequence" in val_ds.column_names:
seq_val = np.asarray(val_ds["sequence"])
return SplitData(X_train, y_train, seq_train, X_val, y_val, seq_val)
def best_f1_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> Tuple[float, float]:
"""
Find threshold maximizing F1 on the given set.
Returns (best_threshold, best_f1).
"""
precision, recall, thresholds = precision_recall_curve(y_true, y_prob)
f1s = (2 * precision[:-1] * recall[:-1]) / (precision[:-1] + recall[:-1] + 1e-12)
best_idx = int(np.nanargmax(f1s))
return float(thresholds[best_idx]), float(f1s[best_idx])
def eval_binary(y_true: np.ndarray, y_prob: np.ndarray, threshold: float) -> Dict[str, float]:
y_pred = (y_prob >= threshold).astype(int)
return {
"f1": float(f1_score(y_true, y_pred)),
"auc": float(roc_auc_score(y_true, y_prob)),
"ap": float(average_precision_score(y_true, y_prob)),
"threshold": float(threshold),
}
# -----------------------------
# Model
# -----------------------------
def train_xgb(
X_train, y_train, X_val, y_val, params: Dict[str, Any]
) -> Tuple[xgb.Booster, np.ndarray, np.ndarray]:
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
num_boost_round = int(params.pop("num_boost_round"))
early_stopping_rounds = int(params.pop("early_stopping_rounds"))
booster = xgb.train(
params=params,
dtrain=dtrain,
num_boost_round=num_boost_round,
evals=[(dval, "val")],
early_stopping_rounds=early_stopping_rounds,
verbose_eval=False,
)
p_train = booster.predict(dtrain)
p_val = booster.predict(dval)
return booster, p_train, p_val
def train_cuml_svc(X_train, y_train, X_val, y_val, params):
Xtr = to_gpu(X_train)
Xva = to_gpu(X_val)
ytr = to_gpu(y_train).astype(cp.int32)
clf = cuSVC(
C=float(params["C"]),
kernel=params["kernel"],
gamma=params.get("gamma", "scale"),
class_weight=params.get("class_weight", None),
probability=bool(params.get("probability", True)),
random_state=1986,
max_iter=int(params.get("max_iter", 1000)),
tol=float(params.get("tol", 1e-4)),
)
clf.fit(Xtr, ytr)
p_train = to_cpu(clf.predict_proba(Xtr)[:, 1])
p_val = to_cpu(clf.predict_proba(Xva)[:, 1])
return clf, p_train, p_val
def train_cuml_elastic_net(X_train, y_train, X_val, y_val, params):
Xtr = to_gpu(X_train)
Xva = to_gpu(X_val)
ytr = to_gpu(y_train).astype(cp.int32)
clf = cuLogReg(
penalty="elasticnet",
C=float(params["C"]),
l1_ratio=float(params["l1_ratio"]),
class_weight=params.get("class_weight", None),
max_iter=int(params.get("max_iter", 1000)),
tol=float(params.get("tol", 1e-4)),
solver="qn",
fit_intercept=True,
)
clf.fit(Xtr, ytr)
p_train = to_cpu(clf.predict_proba(Xtr)[:, 1])
p_val = to_cpu(clf.predict_proba(Xva)[:, 1])
return clf, p_train, p_val
def train_svm(X_train, y_train, X_val, y_val, params):
"""
Kernel SVM via SVC. CPU only in sklearn.
probability=True enables predict_proba but is slower.
"""
clf = SVC(
C=float(params["C"]),
kernel=params["kernel"],
gamma=params.get("gamma", "scale"),
class_weight=params.get("class_weight", None),
probability=True,
random_state=1986,
)
clf.fit(X_train, y_train)
p_train = clf.predict_proba(X_train)[:, 1]
p_val = clf.predict_proba(X_val)[:, 1]
return clf, p_train, p_val
def train_linearsvm_calibrated(X_train, y_train, X_val, y_val, params):
"""
Fast linear SVM (LinearSVC) + probability calibration.
Usually much faster than SVC on large datasets.
"""
base = LinearSVC(
C=float(params["C"]),
class_weight=params.get("class_weight", None),
max_iter=int(params.get("max_iter", 5000)),
random_state=1986,
)
# calibration to get probabilities for PR/ROC + thresholding
clf = CalibratedClassifierCV(base, method="sigmoid", cv=3)
clf.fit(X_train, y_train)
p_train = clf.predict_proba(X_train)[:, 1]
p_val = clf.predict_proba(X_val)[:, 1]
return clf, p_train, p_val
# -----------------------------
# Saving artifacts
# -----------------------------
def save_predictions_csv(
out_dir: str,
split_name: str,
y_true: np.ndarray,
y_prob: np.ndarray,
threshold: float,
sequences: Optional[np.ndarray] = None,
):
os.makedirs(out_dir, exist_ok=True)
df = pd.DataFrame({
"y_true": y_true.astype(int),
"y_prob": y_prob.astype(float),
"y_pred": (y_prob >= threshold).astype(int),
})
if sequences is not None:
df.insert(0, "sequence", sequences)
df.to_csv(os.path.join(out_dir, f"{split_name}_predictions.csv"), index=False)
def plot_curves(out_dir: str, y_true: np.ndarray, y_prob: np.ndarray):
os.makedirs(out_dir, exist_ok=True)
# PR
precision, recall, _ = precision_recall_curve(y_true, y_prob)
plt.figure()
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.tight_layout()
plt.savefig(os.path.join(out_dir, "pr_curve.png"))
plt.close()
# ROC
fpr, tpr, _ = roc_curve(y_true, y_prob)
plt.figure()
plt.plot(fpr, tpr)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.tight_layout()
plt.savefig(os.path.join(out_dir, "roc_curve.png"))
plt.close()
# -----------------------------
# Optuna objectives
# -----------------------------
def make_objective(model_name: str, data: SplitData, out_dir: str):
Xtr, ytr, Xva, yva = data.X_train, data.y_train, data.X_val, data.y_val
def objective(trial: optuna.Trial) -> float:
if model_name == "xgb":
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"lambda": trial.suggest_float("lambda", 1e-8, 50.0, log=True),
"alpha": trial.suggest_float("alpha", 1e-8, 50.0, log=True),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.3, 1.0),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 15),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 500),
"gamma": trial.suggest_float("gamma", 0.0, 10.0),
"tree_method": "hist",
"device": "cuda",
}
params["num_boost_round"] = trial.suggest_int("num_boost_round", 50, 1500)
params["early_stopping_rounds"] = trial.suggest_int("early_stopping_rounds", 20, 200)
model, p_tr, p_va = train_xgb(Xtr, ytr, Xva, yva, params.copy())
elif model_name == "svm":
svm_kind = trial.suggest_categorical("svm_kind", ["svc", "linear_calibrated"])
if svm_kind == "svc":
params = {
"C": trial.suggest_float("C", 1e-3, 1e3, log=True),
"kernel": trial.suggest_categorical("kernel", ["rbf", "linear", "poly", "sigmoid"]),
"class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]),
}
if params["kernel"] in ["rbf", "poly", "sigmoid"]:
params["gamma"] = trial.suggest_float("gamma", 1e-6, 10.0, log=True)
else:
params["gamma"] = "scale"
model, p_tr, p_va = train_svm(Xtr, ytr, Xva, yva, params)
else:
params = {
"C": trial.suggest_float("C", 1e-3, 1e3, log=True),
"class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]),
"max_iter": trial.suggest_int("max_iter", 2000, 20000),
}
model, p_tr, p_va = train_linearsvm_calibrated(Xtr, ytr, Xva, yva, params)
elif model_name == "svm_gpu":
params = {
"C": trial.suggest_float("C", 1e-3, 1e3, log=True),
"kernel": trial.suggest_categorical("kernel", ["rbf", "linear", "poly", "sigmoid"]),
"class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]),
"probability": True,
"max_iter": trial.suggest_int("max_iter", 200, 5000),
"tol": trial.suggest_float("tol", 1e-6, 1e-2, log=True),
}
if params["kernel"] in ["rbf", "poly", "sigmoid"]:
params["gamma"] = trial.suggest_float("gamma", 1e-6, 10.0, log=True)
else:
params["gamma"] = "scale"
model, p_tr, p_va = train_cuml_svc(Xtr, ytr, Xva, yva, params)
elif model_name == "enet_gpu":
params = {
"C": trial.suggest_float("C", 1e-4, 1e3, log=True),
"l1_ratio": trial.suggest_float("l1_ratio", 0.0, 1.0),
"class_weight": trial.suggest_categorical("class_weight", [None, "balanced"]),
"max_iter": trial.suggest_int("max_iter", 200, 5000),
"tol": trial.suggest_float("tol", 1e-6, 1e-2, log=True),
}
model, p_tr, p_va = train_cuml_elastic_net(Xtr, ytr, Xva, yva, params)
else:
raise ValueError(f"Unknown model_name={model_name}")
thr, f1_at_thr = best_f1_threshold(yva, p_va)
metrics = eval_binary(yva, p_va, thr)
trial.set_user_attr("threshold", thr)
trial.set_user_attr("auc", metrics["auc"])
trial.set_user_attr("ap", metrics["ap"])
return f1_at_thr
return objective
# -----------------------------
# Main
# -----------------------------
def run_optuna_and_refit(
dataset_path: str,
out_dir: str,
model_name: str,
n_trials: int = 200,
):
os.makedirs(out_dir, exist_ok=True)
data = load_split_data(dataset_path)
print(f"[Data] Train: {data.X_train.shape}, Val: {data.X_val.shape}")
study = optuna.create_study(direction="maximize", pruner=optuna.pruners.MedianPruner())
study.optimize(make_objective(model_name, data, out_dir), n_trials=n_trials)
trials_df = study.trials_dataframe()
trials_df.to_csv(os.path.join(out_dir, "study_trials.csv"), index=False)
best = study.best_trial
best_params = dict(best.params)
best_thr = float(best.user_attrs["threshold"])
best_auc = float(best.user_attrs["auc"])
best_ap = float(best.user_attrs["ap"])
best_f1 = float(best.value)
# Refit best model on train
if model_name == "xgb":
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"lambda": best_params["lambda"],
"alpha": best_params["alpha"],
"colsample_bytree": best_params["colsample_bytree"],
"subsample": best_params["subsample"],
"learning_rate": best_params["learning_rate"],
"max_depth": best_params["max_depth"],
"min_child_weight": best_params["min_child_weight"],
"gamma": best_params["gamma"],
"tree_method": "hist",
"num_boost_round": best_params["num_boost_round"],
"early_stopping_rounds": best_params["early_stopping_rounds"],
}
model, p_tr, p_va = train_xgb(
data.X_train, data.y_train, data.X_val, data.y_val, params
)
model_path = os.path.join(out_dir, "best_model.json")
model.save_model(model_path)
elif model_name == "svm":
svm_kind = best_params["svm_kind"]
if svm_kind == "svc":
model, p_tr, p_va = train_svm(data.X_train, data.y_train, data.X_val, data.y_val, best_params)
else:
model, p_tr, p_va = train_linearsvm_calibrated(data.X_train, data.y_train, data.X_val, data.y_val, best_params)
model_path = os.path.join(out_dir, "best_model.joblib")
joblib.dump(model, model_path)
elif model_name == "svm_gpu":
model, p_tr, p_va = train_cuml_svc(
data.X_train, data.y_train, data.X_val, data.y_val, best_params
)
model_path = os.path.join(out_dir, "best_model_cuml_svc.joblib")
joblib.dump(model, model_path)
elif model_name == "enet_gpu":
model, p_tr, p_va = train_cuml_elastic_net(
data.X_train, data.y_train, data.X_val, data.y_val, best_params
)
model_path = os.path.join(out_dir, "best_model_cuml_enet.joblib")
joblib.dump(model, model_path)
else:
raise ValueError(model_name)
# Save predictions CSVs
save_predictions_csv(out_dir, "train", data.y_train, p_tr, best_thr, data.seq_train)
save_predictions_csv(out_dir, "val", data.y_val, p_va, best_thr, data.seq_val)
# Plots on val
plot_curves(out_dir, data.y_val, p_va)
summary = [
"=" * 72,
f"MODEL: {model_name}",
f"Best trial: {best.number}",
f"Best F1 (val @ best-threshold): {best_f1:.4f}",
f"Val AUC: {best_auc:.4f}",
f"Val AP: {best_ap:.4f}",
f"Best threshold (picked on val): {best_thr:.4f}",
f"Model saved to: {model_path}",
"Best params:",
json.dumps(best_params, indent=2),
"=" * 72,
]
with open(os.path.join(out_dir, "optimization_summary.txt"), "w") as f:
f.write("\n".join(summary))
print("\n".join(summary))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_path", type=str, required=True)
parser.add_argument("--out_dir", type=str, required=True)
parser.add_argument("--model", type=str, choices=["xgb", "svm_gpu", "enet_gpu"], required=True)
parser.add_argument("--n_trials", type=int, default=200)
args = parser.parse_args()
run_optuna_and_refit(
dataset_path=args.dataset_path,
out_dir=args.out_dir,
model_name=args.model,
n_trials=args.n_trials,
)