import gradio as gr import pandas as pd import re import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor from sklearn.svm import OneClassSVM import matplotlib.pyplot as plt import seaborn as sns import traceback # --- FUNCIÓN DE ANÁLISIS MEJORADA PARA ACEPTAR LA ELECCIÓN DEL MODELO --- def analizar_logs_completo(archivo_log, model_name): if archivo_log is None: return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None, None try: # --- Lectura y Preparación de Datos --- with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() df_logs = pd.DataFrame(lines, columns=['raw_log']) df_logs['raw_log'] = df_logs['raw_log'].str.strip() df_logs.dropna(subset=['raw_log'], inplace=True) df_logs = df_logs[df_logs['raw_log'] != ''] if df_logs.empty: return "El archivo de log está vacío o no se pudo leer.", None, None, None, None, None def preprocesar_log(log_message): message = str(log_message) message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*', '', message, flags=re.IGNORECASE) message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '', message) message = re.sub(r'\b\d+\b', '', message) message = re.sub(r'[a-f0-9-]{8,}', '', message) message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '', message) return message.lower().strip() df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log) vectorizer = TfidfVectorizer(max_features=5000, stop_words=None) X = vectorizer.fit_transform(df_logs['log_template']) X_array = X.toarray() # --- Detección de Errores por Reglas --- error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied'] def detectar_error_por_regla(log_message): return any(keyword in str(log_message).lower() for keyword in error_keywords) df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla) # --- SELECCIÓN Y ENTRENAMIENTO DEL MODELO DE ML --- model_info = f"Resultados generados con el modelo: **{model_name}**" if model_name == "Isolation Forest": model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) model.fit(X_array) elif model_name == "Local Outlier Factor": # Usamos novelty=True para que se comporte como los otros modelos (fit/predict) model = LocalOutlierFactor(n_neighbors=20, contamination='auto', novelty=True) model.fit(X_array) elif model_name == "One-Class SVM": model = OneClassSVM(nu=0.05, kernel="rbf", gamma='auto') model.fit(X_array) df_logs['anomaly_score'] = model.decision_function(X_array) df_logs['anomaly_prediction'] = model.predict(X_array) df_logs['anomaly_ml'] = df_logs['anomaly_prediction'] == -1 # --- GENERACIÓN DE SALIDAS --- total_logs = len(df_logs) errores_reglas = df_logs['error_por_regla'].sum() anomalias_ml = df_logs['anomaly_ml'].sum() summary_md = f""" ## 📊 Resumen General del Análisis - **Total de Líneas de Log Analizadas:** {total_logs} - **Errores Detectados por Reglas:** `{errores_reglas}` - **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}` """ top_templates = df_logs['log_template'].value_counts().head(10) templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" + top_templates.to_markdown() top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10) anomalies_md = f"## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala (score más bajo)*\n\n" if not top_anomalies.empty: anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False) else: anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning." fig1, ax1 = plt.subplots(figsize=(10, 5)) sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1) ax1.set_title(f'Distribución de Scores de Anomalía ({model_name})') ax1.set_xlabel('Score (más bajo = más anómalo)') ax1.set_ylabel('Frecuencia') plt.tight_layout() combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0) fig2, ax2 = plt.subplots(figsize=(8, 6)) if not combined_detections.empty: combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis') ax2.set_title('Comparación: Detección por Reglas vs. ML') ax2.set_xlabel('Detectado como Error por Reglas') ax2.set_ylabel('Número de Logs') ax2.tick_params(axis='x', rotation=0) ax2.legend(['ML: Normal', 'ML: Anomalía']) else: ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center') plt.tight_layout() return model_info, summary_md, templates_md, anomalies_md, fig1, fig2 except Exception as e: error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}" return error_message, None, None, None, None, None # --- CONSTRUCCIÓN DE LA INTERFAZ DE GRADIO CON LA OPCIÓN DE ELEGIR MODELO --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning") gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo. Puedes elegir entre diferentes modelos de ML para la detección de anomalías.") with gr.Row(): with gr.Column(scale=1): log_file_input = gr.File(label="1. Sube tu archivo de log", file_types=[".log", ".txt"]) model_choice_input = gr.Radio( ["Isolation Forest", "Local Outlier Factor", "One-Class SVM"], label="2. Elige el modelo de ML", value="Isolation Forest" ) analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary") with gr.Column(scale=3): model_info_output = gr.Markdown() summary_output = gr.Markdown(label="Resumen General") with gr.Row(): plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía") plot_output_2 = gr.Plot(label="Comparación de Detecciones") templates_output = gr.Markdown(label="Plantillas Comunes") anomalies_output = gr.Markdown(label="Anomalías Detectadas") # Lista de salidas para no repetirla outputs_list = [model_info_output, summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2] gr.Examples( examples=[ ["sample.log", "Isolation Forest"], ["sample_uniform.log", "Isolation Forest"], ["sample.log", "Local Outlier Factor"] ], inputs=[log_file_input, model_choice_input], outputs=outputs_list, fn=analizar_logs_completo, cache_examples=True, label="O prueba con un archivo de ejemplo" ) analyze_button.click( fn=analizar_logs_completo, inputs=[log_file_input, model_choice_input], outputs=outputs_list ) if __name__ == "__main__": demo.launch()