Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| import openai | |
| import requests | |
| import logging | |
| import threading | |
| from datetime import datetime | |
| import re | |
| import time | |
| # Configuración de logging más detallada | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # ========== PROMPT GENERATOR ========== | |
| class PromptGenerator: | |
| def detect_intent(prompt: str) -> dict: | |
| """Detecta la intención del usuario basado en el prompt""" | |
| try: | |
| prompt_lower = prompt.lower().strip() | |
| # Detección de código | |
| code_keywords = [ | |
| 'código', 'code', 'programa', 'function', 'def ', 'import ', | |
| 'python', 'javascript', 'java', 'c++', 'html', 'css', 'sql', | |
| 'algoritmo', 'loop', 'for ', 'while ', 'if ', 'else', | |
| 'variable', 'clase', 'class ', 'función', 'method' | |
| ] | |
| is_code = any(keyword in prompt_lower for keyword in code_keywords) | |
| # Detección de tipo de consulta | |
| if any(word in prompt_lower for word in ['explica', 'explicar', 'qué es', 'qué son', 'defin']): | |
| intent_type = 'explication' | |
| elif any(word in prompt_lower for word in ['ejemplo', 'ejemplifica', 'muestra']): | |
| intent_type = 'example' | |
| elif any(word in prompt_lower for word in ['corrige', 'error', 'bug', 'problema']): | |
| intent_type = 'correction' | |
| else: | |
| intent_type = 'general' | |
| return { | |
| 'is_code': is_code, | |
| 'type': intent_type, | |
| 'language': PromptGenerator._detect_language(prompt_lower) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error en detect_intent: {e}") | |
| return {'is_code': False, 'type': 'general', 'language': 'unknown'} | |
| def _detect_language(prompt: str) -> str: | |
| """Detecta el lenguaje de programación mencionado""" | |
| try: | |
| languages = { | |
| 'python': ['python', 'py'], | |
| 'javascript': ['javascript', 'js', 'node'], | |
| 'java': ['java'], | |
| 'html': ['html'], | |
| 'css': ['css'], | |
| 'sql': ['sql', 'mysql', 'postgresql'], | |
| 'c++': ['c++', 'cpp'], | |
| 'c#': ['c#', 'csharp'] | |
| } | |
| for lang, keywords in languages.items(): | |
| if any(keyword in prompt for keyword in keywords): | |
| return lang | |
| return 'unknown' | |
| except Exception as e: | |
| logger.error(f"Error en _detect_language: {e}") | |
| return 'unknown' | |
| def enhance_prompt(original_prompt: str, intent: dict) -> str: | |
| """Mejora el prompt basado en la intención detectada""" | |
| try: | |
| if intent['is_code']: | |
| if intent['type'] == 'explication': | |
| return f'Explica detalladamente este código: {original_prompt}' | |
| elif intent['type'] == 'example': | |
| lang = intent['language'] if intent['language'] != 'unknown' else 'Python' | |
| return f'Da un ejemplo en {lang}: {original_prompt}' | |
| elif intent['type'] == 'correction': | |
| return f'Corrige este código: {original_prompt}' | |
| else: | |
| return f'Responde sobre programación: {original_prompt}' | |
| else: | |
| if intent['type'] == 'explication': | |
| return f'Explica claramente: {original_prompt}' | |
| elif intent['type'] == 'example': | |
| return f'Proporciona ejemplos: {original_prompt}' | |
| else: | |
| return original_prompt | |
| except Exception as e: | |
| logger.error(f"Error en enhance_prompt: {e}") | |
| return original_prompt | |
| # ========== MODEL MANAGER ========== | |
| class ModelManager: | |
| def __init__(self): | |
| self.dialo_model = None | |
| self.code_model = None | |
| self.dialo_tokenizer = None | |
| self.code_tokenizer = None | |
| self.loaded = False | |
| self.config = {} | |
| self.load_attempted = False | |
| def load_models(self): | |
| """Carga los modelos locales en CPU optimizado para HF""" | |
| if self.load_attempted: | |
| return self.loaded | |
| self.load_attempted = True | |
| try: | |
| logger.info('🚀 Cargando modelos locales en CPU...') | |
| # Cargar solo DialoGPT para simplificar y evitar problemas | |
| logger.info('📥 Cargando DialoGPT-small...') | |
| self.dialo_model = pipeline( | |
| "text-generation", | |
| model="microsoft/DialoGPT-small", | |
| device="cpu", | |
| torch_dtype=torch.float32, | |
| model_kwargs={"low_cpu_mem_usage": True} | |
| ) | |
| self.loaded = True | |
| logger.info('✅ DialoGPT-small cargado exitosamente') | |
| return True | |
| except Exception as e: | |
| logger.error(f'❌ Error cargando modelos: {e}') | |
| self.loaded = False | |
| return False | |
| def set_config(self, config): | |
| """Configuración para APIs externas""" | |
| self.config = config or {} | |
| def generate_local_response(self, prompt, is_code=False, max_length=100): | |
| """Genera respuesta usando modelos locales optimizados""" | |
| if not self.loaded: | |
| if not self.load_models(): | |
| return '❌ Error: No se pudieron cargar los modelos locales. Intenta recargar la página.' | |
| try: | |
| logger.info(f'🤖 Generando respuesta local para: {prompt[:50]}...') | |
| # Usar DialoGPT para todas las respuestas (más simple) | |
| result = self.dialo_model( | |
| prompt, | |
| max_length=max_length, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.dialo_model.tokenizer.eos_token_id, | |
| early_stopping=True | |
| ) | |
| response = result[0]['generated_text'] | |
| # Limpiar la respuesta | |
| if response.startswith(prompt): | |
| response = response[len(prompt):].strip() | |
| logger.info(f'✅ Respuesta local generada: {response[:50]}...') | |
| return response if response else '🤔 No pude generar una respuesta. Intenta reformular tu pregunta.' | |
| except Exception as e: | |
| logger.error(f'❌ Error en generación local: {e}') | |
| return f'⚠️ Error temporal en el modelo: {str(e)}' | |
| # ========== API AGENT ========== | |
| class APIAgent: | |
| def __init__(self): | |
| self.config = {} | |
| def set_config(self, config): | |
| """Configura las claves API desde Gradio""" | |
| self.config = config or {} | |
| def call_openai(self, prompt: str, is_code: bool = False): | |
| """Intenta llamar a OpenAI API""" | |
| api_key = self.config.get('openai_key') | |
| if not api_key: | |
| return None | |
| try: | |
| logger.info('🔄 Intentando llamar a OpenAI API...') | |
| openai.api_key = api_key | |
| model = 'gpt-3.5-turbo' | |
| max_tokens = self.config.get('max_tokens', 400) | |
| temperature = self.config.get('temperature', 0.7) | |
| response = openai.ChatCompletion.create( | |
| model=model, | |
| messages=[{'role': 'user', 'content': prompt}], | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| timeout=10 | |
| ) | |
| logger.info('✅ OpenAI API respondió exitosamente') | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| logger.error(f'❌ Error llamando a OpenAI: {e}') | |
| return None | |
| def call_deepseek(self, prompt: str, is_code: bool = False): | |
| """Intenta llamar a DeepSeek API""" | |
| api_key = self.config.get('deepseek_key') | |
| if not api_key: | |
| return None | |
| try: | |
| logger.info('🔄 Intentando llamar a DeepSeek API...') | |
| url = 'https://api.deepseek.com/v1/chat/completions' | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': f'Bearer {api_key}' | |
| } | |
| max_tokens = self.config.get('max_tokens', 400) | |
| temperature = self.config.get('temperature', 0.7) | |
| data = { | |
| 'model': 'deepseek-chat', | |
| 'messages': [{'role': 'user', 'content': prompt}], | |
| 'max_tokens': max_tokens, | |
| 'temperature': temperature, | |
| 'stream': False | |
| } | |
| response = requests.post(url, json=data, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| result = response.json() | |
| logger.info('✅ DeepSeek API respondió exitosamente') | |
| return result['choices'][0]['message']['content'].strip() | |
| except Exception as e: | |
| logger.error(f'❌ Error llamando a DeepSeek: {e}') | |
| return None | |
| def generate_response(self, prompt: str, is_code: bool = False): | |
| """ | |
| Intenta generar respuesta usando APIs en orden de preferencia | |
| Returns: dict con response y source | |
| """ | |
| # Verificar si hay claves configuradas | |
| has_deepseek = bool(self.config.get('deepseek_key')) | |
| has_openai = bool(self.config.get('openai_key')) | |
| logger.info(f'🔍 APIs disponibles - DeepSeek: {has_deepseek}, OpenAI: {has_openai}') | |
| if has_deepseek: | |
| deepseek_response = self.call_deepseek(prompt, is_code) | |
| if deepseek_response: | |
| return {'response': deepseek_response, 'source': 'deepseek'} | |
| if has_openai: | |
| openai_response = self.call_openai(prompt, is_code) | |
| if openai_response: | |
| return {'response': openai_response, 'source': 'openai'} | |
| logger.info('🔍 Ninguna API disponible, usando modelo local') | |
| return {'response': None, 'source': 'none'} | |
| # ========== CHATBOT PRINCIPAL ========== | |
| class BATUTOChatbot: | |
| def __init__(self): | |
| self.conversation_history = [] | |
| self.config = { | |
| 'deepseek_key': '', | |
| 'openai_key': '', | |
| 'max_tokens': 400, | |
| 'temperature': 0.7 | |
| } | |
| self.model_manager = ModelManager() | |
| self.api_agent = APIAgent() | |
| self.prompt_generator = PromptGenerator() | |
| def update_config(self, deepseek_key, openai_key, max_tokens, temperature): | |
| """Actualiza la configuración desde la UI""" | |
| updated = False | |
| if deepseek_key: | |
| self.config['deepseek_key'] = deepseek_key | |
| updated = True | |
| logger.info('🔑 DeepSeek API Key actualizada') | |
| if openai_key: | |
| self.config['openai_key'] = openai_key | |
| updated = True | |
| logger.info('🔑 OpenAI API Key actualizada') | |
| if max_tokens: | |
| self.config['max_tokens'] = int(max_tokens) | |
| updated = True | |
| if temperature: | |
| self.config['temperature'] = float(temperature) | |
| updated = True | |
| # Actualizar agentes | |
| self.model_manager.set_config(self.config) | |
| self.api_agent.set_config(self.config) | |
| return '✅ Configuración actualizada' if updated else 'ℹ️ Sin cambios' | |
| def get_system_status(self): | |
| """Obtiene el estado del sistema""" | |
| has_deepseek = bool(self.config.get('deepseek_key')) | |
| has_openai = bool(self.config.get('openai_key')) | |
| models_loaded = self.model_manager.loaded | |
| status_html = f''' | |
| <div style='padding: 15px; border-radius: 10px; background: #f8f9fa; border: 2px solid #e9ecef;'> | |
| <h4 style='margin-top: 0;'>Estado del Sistema</h4> | |
| <p><strong>Modelos locales:</strong> {'✅ Cargados' if models_loaded else '🔄 Cargando...'}</p> | |
| <p><strong>DeepSeek API:</strong> {'✅ Configurada' if has_deepseek else '❌ No configurada'}</p> | |
| <p><strong>OpenAI API:</strong> {'✅ Configurada' if has_openai else '❌ No configurada'}</p> | |
| <p><strong>Mensajes en sesión:</strong> {len(self.conversation_history)}</p> | |
| </div> | |
| ''' | |
| return status_html | |
| def chat_response(self, message, history): | |
| """Genera respuesta del chatbot optimizado para HF""" | |
| if not message.strip(): | |
| return '' | |
| # Mostrar indicador de typing | |
| yield '🔄 Procesando...' | |
| try: | |
| # Detectar intención y mejorar prompt | |
| intent = self.prompt_generator.detect_intent(message) | |
| enhanced_prompt = self.prompt_generator.enhance_prompt(message, intent) | |
| # Intentar usar APIs primero | |
| api_result = self.api_agent.generate_response(enhanced_prompt, intent['is_code']) | |
| if api_result['response']: | |
| # Usar respuesta de API | |
| response_text = api_result['response'] | |
| source = api_result['source'] | |
| else: | |
| # Usar modelo local como fallback | |
| response_text = self.model_manager.generate_local_response( | |
| enhanced_prompt, | |
| intent['is_code'], | |
| max_length=150 | |
| ) | |
| source = 'local' | |
| # Agregar metadata a la respuesta | |
| metadata = f'\n\n---\n🔧 *Fuente: {source.upper()}*' | |
| if intent['is_code']: | |
| metadata += f' | 💻 *Tipo: Código*' | |
| else: | |
| metadata += f' | 💬 *Tipo: Conversación*' | |
| full_response = response_text + metadata | |
| # Guardar en historial | |
| self.conversation_history.append({ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'user': message, | |
| 'bot': response_text, | |
| 'source': source, | |
| 'intent': intent | |
| }) | |
| yield full_response | |
| except Exception as e: | |
| error_msg = f'❌ Error: {str(e)}' | |
| logger.error(f'Error en chat_response: {e}') | |
| yield error_msg | |
| def clear_conversation(self): | |
| """Limpia la conversación""" | |
| self.conversation_history.clear() | |
| return None, [] | |
| # ========== CONFIGURACIÓN GRADIO ========== | |
| # Crear instancia del chatbot | |
| chatbot = BATUTOChatbot() | |
| # Cargar modelos al inicio (async) | |
| def load_models_async(): | |
| logger.info('Cargando modelos en segundo plano...') | |
| chatbot.model_manager.load_models() | |
| logger.info('Modelos cargados exitosamente') | |
| # Iniciar carga de modelos | |
| model_loader = threading.Thread(target=load_models_async, daemon=True) | |
| model_loader.start() | |
| # Configuración de la interfaz Gradio para HF | |
| with gr.Blocks( | |
| title='BATUTO Chatbot - Asistente Educativo', | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(''' | |
| # BATUTO Chatbot - Asistente Educativo | |
| **Sistema inteligente con modelos locales y APIs externas** | |
| *Desplegado en Hugging Face Spaces - Versión Optimizada* | |
| ''') | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Área de chat | |
| gr.Markdown('### 💬 Conversación') | |
| chatbot_interface = gr.Chatbot( | |
| label='Chat con BATUTO', | |
| height=400, | |
| show_copy_button=True | |
| ) | |
| msg = gr.Textbox( | |
| label='Escribe tu mensaje', | |
| placeholder='Pregunta sobre programación, explica conceptos, pide ejemplos...', | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button('🚀 Enviar', variant='primary') | |
| clear_btn = gr.Button('🧹 Limpiar', variant='secondary') | |
| with gr.Column(scale=1): | |
| # Panel de estado | |
| gr.Markdown('### 📊 Estado del Sistema') | |
| status_display = gr.HTML() | |
| # Configuración rápida | |
| with gr.Accordion('⚙️ Configuración Rápida', open=False): | |
| deepseek_key = gr.Textbox( | |
| label='DeepSeek API Key', | |
| type='password', | |
| placeholder='sk-...' | |
| ) | |
| openai_key = gr.Textbox( | |
| label='OpenAI API Key', | |
| type='password', | |
| placeholder='sk-...' | |
| ) | |
| with gr.Row(): | |
| max_tokens = gr.Slider( | |
| label='Tokens máx', | |
| minimum=100, | |
| maximum=800, | |
| value=400, | |
| step=50 | |
| ) | |
| temperature = gr.Slider( | |
| label='Temperatura', | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.7, | |
| step=0.1 | |
| ) | |
| save_config_btn = gr.Button('💾 Guardar Config', size='sm') | |
| config_output = gr.Textbox(label='Estado', interactive=False) | |
| # Información | |
| with gr.Accordion('ℹ️ Cómo usar', open=True): | |
| gr.Markdown(''' | |
| **Ejemplos:** | |
| - Muéstrame una función Python para ordenar listas | |
| - Explica qué es machine learning | |
| - Corrige este código: [tu código] | |
| ''') | |
| # Event handlers | |
| def handle_submit(message, history): | |
| if not message.strip(): | |
| return '', history | |
| return '', history + [[message, None]] | |
| # Conectar el botón de enviar | |
| submit_btn.click( | |
| handle_submit, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[msg, chatbot_interface] | |
| ).then( | |
| chatbot.chat_response, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[chatbot_interface] | |
| ) | |
| # Enter también envía | |
| msg.submit( | |
| handle_submit, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[msg, chatbot_interface] | |
| ).then( | |
| chatbot.chat_response, | |
| inputs=[msg, chatbot_interface], | |
| outputs=[chatbot_interface] | |
| ) | |
| # Limpiar chat | |
| clear_btn.click( | |
| chatbot.clear_conversation, | |
| outputs=[msg, chatbot_interface] | |
| ) | |
| # Configuración | |
| save_config_btn.click( | |
| chatbot.update_config, | |
| inputs=[deepseek_key, openai_key, max_tokens, temperature], | |
| outputs=[config_output] | |
| ).then( | |
| chatbot.get_system_status, | |
| outputs=[status_display] | |
| ) | |
| # Actualizar estado al cargar | |
| demo.load( | |
| chatbot.get_system_status, | |
| outputs=[status_display] | |
| ) | |
| if __name__ == '__main__': | |
| demo.launch() |