analisis_logs / app.py
spjasper's picture
version inicial
7e1d35a
raw
history blame
6.84 kB
import gradio as gr
import pandas as pd
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import seaborn as sns
# --- PASO 1: LA FUNCIÓN DE ANÁLISIS (CON LECTURA DE ARCHIVO CORREGIDA) ---
def analizar_logs_completo(archivo_log):
if archivo_log is None:
return "Por favor, sube un archivo de log para comenzar el análisis.", None, None, None, None
try:
# --- Lectura y Preparación de Datos (MÉTODO CORREGIDO) ---
# Leemos el archivo de la forma más robusta: línea por línea, ignorando errores de codificación.
with open(archivo_log.name, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# Creamos el DataFrame a partir de la lista de líneas
df_logs = pd.DataFrame(lines, columns=['raw_log'])
# Limpiamos líneas vacías o con solo espacios en blanco que puedan causar problemas
df_logs['raw_log'] = df_logs['raw_log'].str.strip()
df_logs.dropna(subset=['raw_log'], inplace=True)
df_logs = df_logs[df_logs['raw_log'] != '']
if df_logs.empty:
return "El archivo de log está vacío o no se pudo leer correctamente.", None, None, None, None
def preprocesar_log(log_message):
message = str(log_message)
message = re.sub(r'^\[.*?\]\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(,\d{3})?\s*(INFO|ERROR|DEBUG|WARN|FATAL|WARNING):\s*', '', message, flags=re.IGNORECASE)
message = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '<IP>', message)
message = re.sub(r'\b\d+\b', '<NUM>', message)
message = re.sub(r'[a-f0-9-]{8,}', '<HASH_ID>', message)
message = re.sub(r'/(?:[a-zA-Z0-9_.-]+/)*[a-zA-Z0-9_.-]+', '<PATH>', message)
return message.lower().strip()
df_logs['log_template'] = df_logs['raw_log'].apply(preprocesar_log)
vectorizer = TfidfVectorizer(max_features=5000, stop_words=None)
X = vectorizer.fit_transform(df_logs['log_template'])
error_keywords = ['error', 'fatal', 'failed', 'exception', 'nullpointer', 'timeout', 'denied']
def detectar_error_por_regla(log_message):
return any(keyword in str(log_message).lower() for keyword in error_keywords)
df_logs['error_por_regla'] = df_logs['raw_log'].apply(detectar_error_por_regla)
model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(X.toarray())
df_logs['anomaly_score'] = model.decision_function(X.toarray())
df_logs['anomaly_ml'] = model.predict(X.toarray()) == -1
total_logs = len(df_logs)
errores_reglas = df_logs['error_por_regla'].sum()
anomalias_ml = df_logs['anomaly_ml'].sum()
summary_md = f"""
## 📊 Resumen General del Análisis
- **Total de Líneas de Log Analizadas:** {total_logs}
- **Errores Detectados por Reglas:** `{errores_reglas}`
- **Anomalías Detectadas por Machine Learning:** `{anomalias_ml}`
"""
top_templates = df_logs['log_template'].value_counts().head(10)
templates_md = "## 📋 Plantillas de Log Más Comunes\n\n"
templates_md += top_templates.to_markdown()
top_anomalies = df_logs[df_logs['anomaly_ml']].sort_values(by='anomaly_score').head(10)
anomalies_md = "## ❗ Ejemplos de Anomalías Detectadas (ML)\n*Ordenadas por la más anómala primero (score más bajo)*\n\n"
if not top_anomalies.empty:
anomalies_md += top_anomalies[['raw_log', 'anomaly_score']].to_markdown(index=False)
else:
anomalies_md += "No se detectaron anomalías con el modelo de Machine Learning."
fig1, ax1 = plt.subplots(figsize=(10, 5))
sns.histplot(df_logs['anomaly_score'], bins=50, kde=True, ax=ax1)
ax1.set_title('Distribución de Scores de Anomalía (Isolation Forest)')
ax1.set_xlabel('Score (más bajo = más anómalo)')
ax1.set_ylabel('Frecuencia')
plt.tight_layout()
combined_detections = df_logs.groupby(['error_por_regla', 'anomaly_ml']).size().unstack(fill_value=0)
fig2, ax2 = plt.subplots(figsize=(8, 6))
if not combined_detections.empty:
combined_detections.plot(kind='bar', stacked=True, ax=ax2, colormap='viridis')
ax2.set_title('Comparación: Detección por Reglas vs. ML')
ax2.set_xlabel('Detectado como Error por Reglas')
ax2.set_ylabel('Número de Logs')
ax2.tick_params(axis='x', rotation=0)
ax2.legend(['ML: Normal', 'ML: Anomalía'])
else:
ax2.text(0.5, 0.5, 'No hay datos para graficar.', ha='center', va='center')
plt.tight_layout()
return summary_md, templates_md, anomalies_md, fig1, fig2
except Exception as e:
import traceback
error_message = f"Ha ocurrido un error durante el análisis:\n\n{traceback.format_exc()}"
return error_message, None, None, None, None
# --- PASO 2: CONSTRUIR LA INTERFAZ DE GRADIO (SIN CAMBIOS) ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🔍 Analizador Avanzado de Logs con Machine Learning")
gr.Markdown("Sube un archivo de log (.log, .txt) para realizar un análisis completo que incluye detección de errores por reglas y detección de anomalías con un modelo de ML (Isolation Forest).")
with gr.Row():
with gr.Column(scale=1):
log_file_input = gr.File(label="Sube tu archivo de log", file_types=[".log", ".txt"])
analyze_button = gr.Button("🚀 Analizar Archivo", variant="primary")
with gr.Column(scale=3):
summary_output = gr.Markdown(label="Resumen General")
with gr.Row():
plot_output_1 = gr.Plot(label="Distribución de Scores de Anomalía")
plot_output_2 = gr.Plot(label="Comparación de Detecciones")
templates_output = gr.Markdown(label="Plantillas Comunes")
anomalies_output = gr.Markdown(label="Anomalías Detectadas")
gr.Examples(
examples=[["sample.log"]],
inputs=log_file_input,
outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2],
fn=analizar_logs_completo,
cache_examples=True,
label="O prueba con un archivo de ejemplo"
)
analyze_button.click(
fn=analizar_logs_completo,
inputs=log_file_input,
outputs=[summary_output, templates_output, anomalies_output, plot_output_1, plot_output_2]
)
if __name__ == "__main__":
demo.launch()