Spaces:
Running
Running
| import gradio as gr | |
| import pandas as pd | |
| import re | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.neighbors import LocalOutlierFactor | |
| from sklearn.svm import OneClassSVM | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import traceback | |
| # --- FUNCIÓN DE ANÁLISIS MEJORADA PARA ACEPTAR LA ELECCIÓN DEL MODELO --- | |
| def analizar_logs_completo(archivo_log, model_name): | |
| if archivo_log is None: | |
| return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None, None | |
| try: | |
| # --- Lectura y Preparación de Datos --- | |
| with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f: | |
| lines = f.readlines() | |
| df_logs = pd.DataFrame(lines, columns=['raw_log']) | |
| df_logs['raw_log'] = df_logs['raw_log'].str.strip() | |
| df_logs.dropna(subset=['raw_log'], inplace=True) | |
| df_logs = df_logs[df_logs['raw_log'] != ''] | |
| if df_logs.empty: | |
| return "El archivo de log está vacío o no se pudo leer.", None, None, None, None, None | |
| def preprocesar_log(log_message): | |
| message = str(log_message) | |
| message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE) | |
| message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*', '', message, flags=re.IGNORECASE) | |
| message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message) | |
| message = re.sub(r'\b\d+\b', '<NUM>', message) | |
| message = re.sub(r'[a-f0-9-]{8,}', '<HASH_ID>', message) | |
| message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '<PATH>', message) | |
| return message.lower().strip() | |
| df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log) | |
| vectorizer = TfidfVectorizer(max_features=5000, stop_words=None) | |
| X = vectorizer.fit_transform(df_logs['log_template']) | |
| X_array = X.toarray() | |
| # --- Detección de Errores por Reglas --- | |
| error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied'] | |
| def detectar_error_por_regla(log_message): | |
| return any(keyword in str(log_message).lower() for keyword in error_keywords) | |
| df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla) | |
| # --- SELECCIÓN Y ENTRENAMIENTO DEL MODELO DE ML --- | |
| model_info = f"Resultados generados con el modelo: **{model_name}**" | |
| if model_name == "Isolation Forest": | |
| model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) | |
| model.fit(X_array) | |
| elif model_name == "Local Outlier Factor": | |
| # Usamos novelty=True para que se comporte como los otros modelos (fit/predict) | |
| model = LocalOutlierFactor(n_neighbors=20, contamination='auto', novelty=True) | |
| model.fit(X_array) | |
| elif model_name == "One-Class SVM": | |
| model = OneClassSVM(nu=0.05, kernel="rbf", gamma='auto') | |
| model.fit(X_array) | |
| df_logs['anomaly_score'] = model.decision_function(X_array) | |
| df_logs['anomaly_prediction'] = model.predict(X_array) | |
| df_logs['anomaly_ml'] = df_logs['anomaly_prediction'] == -1 | |
| # --- GENERACIÓN DE SALIDAS --- | |
| total_logs = len(df_logs) | |
| errores_reglas = df_logs['error_por_regla'].sum() | |
| anomalias_ml = df_logs['anomaly_ml'].sum() | |
| summary_md = f""" | |
| ## 📊 Resumen General del Análisis | |
| - **Total de Líneas de Log Analizadas:** {total_logs} | |
| - **Errores Detectados por Reglas:** `{errores_reglas}` | |
| - **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}` | |
| """ | |
| top_templates = df_logs['log_template'].value_counts().head(10) | |
| templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" + top_templates.to_markdown() | |
| top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10) | |
| anomalies_md = f"## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala (score más bajo)*\n\n" | |
| if not top_anomalies.empty: | |
| anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False) | |
| else: | |
| anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning." | |
| fig1, ax1 = plt.subplots(figsize=(10, 5)) | |
| sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1) | |
| ax1.set_title(f'Distribución de Scores de Anomalía ({model_name})') | |
| ax1.set_xlabel('Score (más bajo = más anómalo)') | |
| ax1.set_ylabel('Frecuencia') | |
| plt.tight_layout() | |
| combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0) | |
| fig2, ax2 = plt.subplots(figsize=(8, 6)) | |
| if not combined_detections.empty: | |
| combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis') | |
| ax2.set_title('Comparación: Detección por Reglas vs. ML') | |
| ax2.set_xlabel('Detectado como Error por Reglas') | |
| ax2.set_ylabel('Número de Logs') | |
| ax2.tick_params(axis='x', rotation=0) | |
| ax2.legend(['ML: Normal', 'ML: Anomalía']) | |
| else: | |
| ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center') | |
| plt.tight_layout() | |
| return model_info, summary_md, templates_md, anomalies_md, fig1, fig2 | |
| except Exception as e: | |
| error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}" | |
| return error_message, None, None, None, None, None | |
| # --- CONSTRUCCIÓN DE LA INTERFAZ DE GRADIO CON LA OPCIÓN DE ELEGIR MODELO --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning") | |
| gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo. Puedes elegir entre diferentes modelos de ML para la detección de anomalías.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| log_file_input = gr.File(label="1. Sube tu archivo de log", file_types=[".log", ".txt"]) | |
| model_choice_input = gr.Radio( | |
| ["Isolation Forest", "Local Outlier Factor", "One-Class SVM"], | |
| label="2. Elige el modelo de ML", | |
| value="Isolation Forest" | |
| ) | |
| analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary") | |
| with gr.Column(scale=3): | |
| model_info_output = gr.Markdown() | |
| summary_output = gr.Markdown(label="Resumen General") | |
| with gr.Row(): | |
| plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía") | |
| plot_output_2 = gr.Plot(label="Comparación de Detecciones") | |
| templates_output = gr.Markdown(label="Plantillas Comunes") | |
| anomalies_output = gr.Markdown(label="Anomalías Detectadas") | |
| # Lista de salidas para no repetirla | |
| outputs_list = [model_info_output, summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2] | |
| gr.Examples( | |
| examples=[ | |
| ["sample.log", "Isolation Forest"], | |
| ["sample_uniform.log", "Isolation Forest"], | |
| ["sample.log", "Local Outlier Factor"] | |
| ], | |
| inputs=[log_file_input, model_choice_input], | |
| outputs=outputs_list, | |
| fn=analizar_logs_completo, | |
| cache_examples=True, | |
| label="O prueba con un archivo de ejemplo" | |
| ) | |
| analyze_button.click( | |
| fn=analizar_logs_completo, | |
| inputs=[log_file_input, model_choice_input], | |
| outputs=outputs_list | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |