analisis_logs / app.py
spjasper's picture
version2
1d16b92
raw
history blame
7.89 kB
import gradio as gr
import pandas as pd
import re
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
import matplotlib.pyplot as plt
import seaborn as sns
import traceback
# --- FUNCIÓN DE ANÁLISIS MEJORADA PARA ACEPTAR LA ELECCIÓN DEL MODELO ---
def analizar_logs_completo(archivo_log, model_name):
if archivo_log is None:
return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None, None
try:
# --- Lectura y Preparación de Datos ---
with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
df_logs = pd.DataFrame(lines, columns=['raw_log'])
df_logs['raw_log'] = df_logs['raw_log'].str.strip()
df_logs.dropna(subset=['raw_log'], inplace=True)
df_logs = df_logs[df_logs['raw_log'] != '']
if df_logs.empty:
return "El archivo de log está vacío o no se pudo leer.", None, None, None, None, None
def preprocesar_log(log_message):
message = str(log_message)
message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message)
message = re.sub(r'\b\d+\b', '<NUM>', message)
message = re.sub(r'[a-f0-9-]{8,}', '<HASH_ID>', message)
message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '<PATH>', message)
return message.lower().strip()
df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log)
vectorizer = TfidfVectorizer(max_features=5000, stop_words=None)
X = vectorizer.fit_transform(df_logs['log_template'])
X_array = X.toarray()
# --- Detección de Errores por Reglas ---
error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied']
def detectar_error_por_regla(log_message):
return any(keyword in str(log_message).lower() for keyword in error_keywords)
df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla)
# --- SELECCIÓN Y ENTRENAMIENTO DEL MODELO DE ML ---
model_info = f"Resultados generados con el modelo: **{model_name}**"
if model_name == "Isolation Forest":
model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(X_array)
elif model_name == "Local Outlier Factor":
# Usamos novelty=True para que se comporte como los otros modelos (fit/predict)
model = LocalOutlierFactor(n_neighbors=20, contamination='auto', novelty=True)
model.fit(X_array)
elif model_name == "One-Class SVM":
model = OneClassSVM(nu=0.05, kernel="rbf", gamma='auto')
model.fit(X_array)
df_logs['anomaly_score'] = model.decision_function(X_array)
df_logs['anomaly_prediction'] = model.predict(X_array)
df_logs['anomaly_ml'] = df_logs['anomaly_prediction'] == -1
# --- GENERACIÓN DE SALIDAS ---
total_logs = len(df_logs)
errores_reglas = df_logs['error_por_regla'].sum()
anomalias_ml = df_logs['anomaly_ml'].sum()
summary_md = f"""
## 📊 Resumen General del Análisis
- **Total de Líneas de Log Analizadas:** {total_logs}
- **Errores Detectados por Reglas:** `{errores_reglas}`
- **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}`
"""
top_templates = df_logs['log_template'].value_counts().head(10)
templates_md = "## 📋 Plantillas de Log Más Comunes\n\n" + top_templates.to_markdown()
top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10)
anomalies_md = f"## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala (score más bajo)*\n\n"
if not top_anomalies.empty:
anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False)
else:
anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning."
fig1, ax1 = plt.subplots(figsize=(10, 5))
sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1)
ax1.set_title(f'Distribución de Scores de Anomalía ({model_name})')
ax1.set_xlabel('Score (más bajo = más anómalo)')
ax1.set_ylabel('Frecuencia')
plt.tight_layout()
combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0)
fig2, ax2 = plt.subplots(figsize=(8, 6))
if not combined_detections.empty:
combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis')
ax2.set_title('Comparación: Detección por Reglas vs. ML')
ax2.set_xlabel('Detectado como Error por Reglas')
ax2.set_ylabel('Número de Logs')
ax2.tick_params(axis='x', rotation=0)
ax2.legend(['ML: Normal', 'ML: Anomalía'])
else:
ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center')
plt.tight_layout()
return model_info, summary_md, templates_md, anomalies_md, fig1, fig2
except Exception as e:
error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}"
return error_message, None, None, None, None, None
# --- CONSTRUCCIÓN DE LA INTERFAZ DE GRADIO CON LA OPCIÓN DE ELEGIR MODELO ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning")
gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo. Puedes elegir entre diferentes modelos de ML para la detección de anomalías.")
with gr.Row():
with gr.Column(scale=1):
log_file_input = gr.File(label="1. Sube tu archivo de log", file_types=[".log", ".txt"])
model_choice_input = gr.Radio(
["Isolation Forest", "Local Outlier Factor", "One-Class SVM"],
label="2. Elige el modelo de ML",
value="Isolation Forest"
)
analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary")
with gr.Column(scale=3):
model_info_output = gr.Markdown()
summary_output = gr.Markdown(label="Resumen General")
with gr.Row():
plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía")
plot_output_2 = gr.Plot(label="Comparación de Detecciones")
templates_output = gr.Markdown(label="Plantillas Comunes")
anomalies_output = gr.Markdown(label="Anomalías Detectadas")
# Lista de salidas para no repetirla
outputs_list = [model_info_output, summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2]
gr.Examples(
examples=[
["sample.log", "Isolation Forest"],
["sample_uniform.log", "Isolation Forest"],
["sample.log", "Local Outlier Factor"]
],
inputs=[log_file_input, model_choice_input],
outputs=outputs_list,
fn=analizar_logs_completo,
cache_examples=True,
label="O prueba con un archivo de ejemplo"
)
analyze_button.click(
fn=analizar_logs_completo,
inputs=[log_file_input, model_choice_input],
outputs=outputs_list
)
if __name__ == "__main__":
demo.launch()