import gradio as gr import json import os import time import random import numpy as np import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import f1_score, precision_score, recall_score from scipy.spatial.distance import cosine import pickle from pathlib import Path from itertools import combinations # Import the necessary functions from your existing code from pan22_verif_evaluator import evaluate_all # Implement the main logic functions (simplified versions) def cosine_sim(a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def rescale(value, orig_min, orig_max, new_min, new_max): orig_span = orig_max - orig_min new_span = new_max - new_min try: scaled_value = float(value - orig_min) / float(orig_span) except ZeroDivisionError: orig_span += 1e-6 scaled_value = float(value - orig_min) / float(orig_span) return new_min + (scaled_value * new_span) def correct_scores(scores, p1, p2): for sc in scores: if sc <= p1: yield rescale(sc, 0, p1, 0, 0.49) elif p1 < sc < p2: yield 0.5 else: yield rescale(sc, p2, 1, 0.51, 1) # Main training function def train_model(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout): gold = {} for line in open(truths_file): d = json.loads(line.strip()) gold[d['id']] = int(d['same']) # truncation for development purposes cutoff = 0 if cutoff: gold = dict(random.sample(gold.items(), cutoff)) # print(len(gold)) texts = [] for line in open(pairs_file,encoding='utf8'): d = json.loads(line.strip()) if d['id'] in gold: texts.extend(d['pair']) # Process the data and train the model vectorizer = TfidfVectorizer(max_features=vocab_size, analyzer='char', ngram_range=(ngram_size, ngram_size)) vectorizer.fit(texts) if num_iterations: total_feats = len(vectorizer.get_feature_names_out()) keep_feats = int(total_feats * dropout) rnd_feature_idxs = [] for _ in range(num_iterations): rnd_feature_idxs.append(np.random.choice(total_feats, keep_feats, replace=False)) rnd_feature_idxs = np.array(rnd_feature_idxs) similarities, labels = [], [] for line in open(pairs_file,encoding='utf8'): d = json.loads(line.strip()) if d['id'] in gold: x1, x2 = vectorizer.transform(d['pair']).toarray() if num_iterations: similarities_ = [] for i in range(num_iterations): similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]], x2[rnd_feature_idxs[i, :]])) similarities.append(np.mean(similarities_)) else: similarities.append(cosine_sim(x1, x2)) labels.append(gold[d['id']]) similarities = np.array(similarities, dtype=np.float64) labels = np.array(labels, dtype=np.float64) print('-> grid search p1/p2:') step_size = 0.01 thresholds = np.arange(0.01, 0.99, step_size) combs = [(p1, p2) for (p1, p2) in combinations(thresholds, 2) if p1 < p2] params = {} for p1, p2 in combs: corrected_scores = np.array(list(correct_scores(similarities, p1=p1, p2=p2))) score = evaluate_all(pred_y=corrected_scores, true_y=labels) params[(p1, p2)] = score['overall'] opt_p1, opt_p2 = max(params, key=params.get) print('optimal p1/p2:', opt_p1, opt_p2) corrected_scores = np.array(list(correct_scores(similarities, p1=opt_p1, p2=opt_p2))) evaluation_result = evaluate_all(pred_y=corrected_scores, true_y=labels) print('optimal score:', evaluation_result) print('-> determining optimal threshold') scores = [] for th in np.linspace(0.25, 0.75, 1000): adjusted = (corrected_scores >= th) * 1 scores.append((th, f1_score(labels, adjusted), precision_score(labels, adjusted), recall_score(labels, adjusted))) thresholds, f1s, precisions, recalls = zip(*scores) max_idx = np.array(f1s).argmax() max_f1 = f1s[max_idx] max_th = thresholds[max_idx] print(f'Dev results -> F1={max_f1} at th={max_th}') # Save the model model = { 'vectorizer': vectorizer, 'opt_p1': opt_p1, 'opt_p2': opt_p2, 'rnd_feature_idxs': rnd_feature_idxs if num_iterations else None, 'evaluation_result': evaluation_result } pickle_path = os.path.join(os.getcwd(), 'model.pkl') with open(pickle_path, 'wb') as f: pickle.dump(model, f) return "Training complete. Model files saved.", opt_p1, opt_p2, evaluation_result, pickle_path # Gradio interface def gradio_interface(pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout): if pairs_file is None or truths_file is None: return "Please upload both JSON files.", None, gr.Group(visible=False), None, None try: start_time = time.time() training_message, opt_p1, opt_p2, evaluation_result, pickle_path = train_model( pairs_file.name, truths_file.name, vocab_size, ngram_size, num_iterations, dropout ) end_time = time.time() execution_time = end_time - start_time # Calculate execution time # Create a DataFrame for display data = { 'Metric': ['p1', 'p2', 'AUC', 'c@1', 'f_05_u', 'F1', 'Brier', 'Overall', 'Execution Time'], 'Value': [ opt_p1, opt_p2, evaluation_result['auc'], evaluation_result['c@1'], evaluation_result['f_05_u'], evaluation_result['F1'], evaluation_result['brier'], evaluation_result['overall'], round(execution_time, 2) ] } df = pd.DataFrame(data) return training_message, df, gr.Group(visible=True), pickle_path, pickle_path except Exception as e: return f"An error occurred: {str(e)}", None, gr.Group(visible=False), None, None with gr.Blocks() as iface: gr.Markdown("# Character 4-grams Model") model_path = gr.State(None) with gr.Tab("Train"): gr.Markdown("Upload pairs.json and truths.json files, adjust parameters, then click 'Train' to train and evaluate the model.") with gr.Row(): pairs_file = gr.File(label="Upload pairs.json") truths_file = gr.File(label="Upload truths.json") with gr.Row(): vocab_size = gr.Slider(minimum=1000, maximum=50000, step=100, value=3000, label="Vocabulary Size") ngram_size = gr.Slider(minimum=2, maximum=6, step=1, value=4, label="N-gram Size") num_iterations = gr.Slider(minimum=0, maximum=100, step=1, value=0, label="Number of Iterations") dropout = gr.Slider(minimum=0.1, maximum=0.9, step=0.1, value=0.5, label="Dropout") submit_btn = gr.Button("Train") status_box = gr.Textbox(label="Status") with gr.Group(visible=False) as output_group: gr.Markdown("## Evaluation Metrics") output_table = gr.DataFrame() download_button = gr.File(label="Download Model") with gr.Tab('Test'): gr.Markdown("Enter two texts to compare and click 'Predict' to estimate their similarity.") text1 = gr.Textbox(label="Text 1") text2 = gr.Textbox(label="Text 2") predict_btn = gr.Button("Predict") similarity_output = gr.Textbox(label="Similarity Result") def test_model(text1, text2, model_path): if model_path is None: return "Please train the model first." model = pickle.load(open(model_path, 'rb')) vectorizer = model['vectorizer'] opt_p1 = model['opt_p1'] opt_p2 = model['opt_p2'] num_iterations = model['rnd_feature_idxs'] is not None rnd_feature_idxs = model['rnd_feature_idxs'] x1, x2 = vectorizer.transform([text1, text2]).toarray() if num_iterations: similarities_ = [] for i in range(len(rnd_feature_idxs)): similarities_.append(cosine_sim(x1[rnd_feature_idxs[i, :]], x2[rnd_feature_idxs[i, :]])) similarity = np.mean(similarities_) else: similarity = cosine_sim(x1, x2) similarity = np.array(list(correct_scores([similarity], p1=opt_p1, p2=opt_p2)))[0] return f"Similarity: {similarity:.4f}" submit_btn.click( gradio_interface, inputs=[pairs_file, truths_file, vocab_size, ngram_size, num_iterations, dropout], outputs=[status_box, output_table, output_group, download_button, model_path] ) predict_btn.click( test_model, inputs=[text1, text2, model_path], outputs=[similarity_output] ) if __name__ == "__main__": iface.launch()