Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Novita AI RAG Chat Application - Uses your dataset as context | |
| No fine-tuning required! | |
| """ | |
| import os | |
| import json | |
| import requests | |
| import time | |
| from pathlib import Path | |
| from difflib import SequenceMatcher | |
| def load_system_prompt(default_text): | |
| """Load system prompt from configs/system_prompt.md if available. | |
| Extracts text between triple quotes ("")"), otherwise falls back to default_text. | |
| """ | |
| try: | |
| base_dir = os.path.dirname(__file__) | |
| md_path = os.path.join(base_dir, 'configs', 'system_prompt.md') | |
| if not os.path.exists(md_path): | |
| return default_text | |
| with open(md_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| start = content.find('"""') | |
| end = content.rfind('"""') | |
| if start != -1 and end != -1 and end > start: | |
| return content[start+3:end].strip() | |
| # Fallback: strip markdown headers | |
| lines = [] | |
| for line in content.splitlines(): | |
| if line.strip().startswith('#'): | |
| continue | |
| lines.append(line) | |
| cleaned = '\n'.join(lines).strip() | |
| return cleaned or default_text | |
| except Exception: | |
| return default_text | |
| class NovitaAIRAGChat: | |
| def __init__(self, api_key, dataset_path="data/textilindo_training_data.jsonl"): | |
| self.api_key = api_key | |
| self.base_url = "https://api.novita.ai/openai" | |
| self.headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # System prompt / persona | |
| self.system_prompt = os.getenv( | |
| 'SYSTEM_PROMPT', | |
| load_system_prompt("You are Textilindo AI Assistant. Be concise, helpful, and use Indonesian.") | |
| ) | |
| self.conversation_history = [ | |
| {"role": "system", "content": self.system_prompt} | |
| ] | |
| self.current_model = "qwen/qwen3-235b-a22b-instruct-2507" # High-quality model | |
| self.dataset = self.load_dataset(dataset_path) | |
| self.context_window = 5 # Number of most relevant examples to include | |
| def load_dataset(self, dataset_path): | |
| """Load the training dataset""" | |
| print(f"π Loading dataset from {dataset_path}...") | |
| dataset = [] | |
| if not os.path.exists(dataset_path): | |
| print(f"β οΈ Dataset not found: {dataset_path}") | |
| return dataset | |
| try: | |
| with open(dataset_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| data = json.loads(line) | |
| dataset.append(data) | |
| print(f"β Loaded {len(dataset)} examples from dataset") | |
| except Exception as e: | |
| print(f"β Error loading dataset: {e}") | |
| return dataset | |
| def find_relevant_context(self, user_query, top_k=5): | |
| """Find most relevant examples from dataset""" | |
| if not self.dataset: | |
| return [] | |
| # Simple similarity scoring | |
| scores = [] | |
| for i, example in enumerate(self.dataset): | |
| instruction = example.get('instruction', '').lower() | |
| output = example.get('output', '').lower() | |
| query = user_query.lower() | |
| # Calculate similarity scores | |
| instruction_score = SequenceMatcher(None, query, instruction).ratio() | |
| output_score = SequenceMatcher(None, query, output).ratio() | |
| # Combined score (weight instruction more heavily) | |
| combined_score = (instruction_score * 0.7) + (output_score * 0.3) | |
| scores.append((combined_score, i)) | |
| # Sort by score and get top_k | |
| scores.sort(reverse=True) | |
| relevant_examples = [] | |
| for score, idx in scores[:top_k]: | |
| if score > 0.1: # Only include if similarity > 10% | |
| relevant_examples.append(self.dataset[idx]) | |
| return relevant_examples | |
| def create_context_prompt(self, user_query, relevant_examples): | |
| """Create a prompt with relevant context""" | |
| if not relevant_examples: | |
| return user_query | |
| context_parts = [] | |
| context_parts.append("Berikut adalah beberapa contoh pertanyaan dan jawaban tentang Textilindo:") | |
| context_parts.append("") | |
| for i, example in enumerate(relevant_examples, 1): | |
| instruction = example.get('instruction', '') | |
| output = example.get('output', '') | |
| context_parts.append(f"Contoh {i}:") | |
| context_parts.append(f"Pertanyaan: {instruction}") | |
| context_parts.append(f"Jawaban: {output}") | |
| context_parts.append("") | |
| context_parts.append("Berdasarkan contoh di atas, jawab pertanyaan berikut:") | |
| context_parts.append(f"Pertanyaan: {user_query}") | |
| context_parts.append("Jawaban:") | |
| return "\n".join(context_parts) | |
| def chat_completion(self, message, model=None): | |
| """Send message to Novita AI with RAG context""" | |
| if model is None: | |
| model = self.current_model | |
| # Find relevant context | |
| relevant_examples = self.find_relevant_context(message, self.context_window) | |
| # Create context-aware prompt | |
| if relevant_examples: | |
| enhanced_prompt = self.create_context_prompt(message, relevant_examples) | |
| print(f"π Found {len(relevant_examples)} relevant examples from dataset") | |
| else: | |
| enhanced_prompt = message | |
| print("π No relevant examples found, using direct query") | |
| # Ensure system prompt is first | |
| if not self.conversation_history or self.conversation_history[0].get("role") != "system": | |
| self.conversation_history.insert(0, {"role": "system", "content": self.system_prompt}) | |
| # Add to conversation history | |
| self.conversation_history.append({"role": "user", "content": enhanced_prompt}) | |
| # Prepare payload | |
| payload = { | |
| "model": model, | |
| "messages": self.conversation_history, | |
| "max_tokens": 500, | |
| "temperature": 0.7, | |
| "top_p": 0.9 | |
| } | |
| try: | |
| print("π€ Thinking...", end="", flush=True) | |
| response = requests.post( | |
| f"{self.base_url}/chat/completions", | |
| headers=self.headers, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| assistant_message = result.get('choices', [{}])[0].get('message', {}).get('content', '') | |
| # Add assistant response to history | |
| self.conversation_history.append({"role": "assistant", "content": assistant_message}) | |
| print("\r" + " " * 20 + "\r", end="") # Clear "Thinking..." message | |
| return assistant_message | |
| else: | |
| print(f"\rβ Error: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"\rβ Error: {e}") | |
| return None | |
| def change_model(self, model_id): | |
| """Change the current model""" | |
| self.current_model = model_id | |
| print(f"β Model changed to: {model_id}") | |
| def clear_history(self): | |
| """Clear conversation history""" | |
| self.conversation_history = [ | |
| {"role": "system", "content": self.system_prompt} | |
| ] | |
| print("β Conversation history cleared") | |
| def set_system_prompt(self, prompt_text): | |
| """Update system prompt/persona and reset conversation history""" | |
| prompt_text = (prompt_text or '').strip() | |
| if not prompt_text: | |
| print("β System prompt cannot be empty") | |
| return | |
| self.system_prompt = prompt_text | |
| self.clear_history() | |
| print("β System prompt updated") | |
| def show_models(self): | |
| """Show available models""" | |
| try: | |
| response = requests.get(f"{self.base_url}/models", headers=self.headers, timeout=10) | |
| if response.status_code == 200: | |
| models = response.json().get('data', []) | |
| print("\nπ Available Models:") | |
| print("-" * 50) | |
| for i, model in enumerate(models[:20], 1): # Show first 20 models | |
| model_id = model.get('id', 'Unknown') | |
| print(f"{i:2d}. {model_id}") | |
| print("-" * 50) | |
| print(f"Current model: {self.current_model}") | |
| else: | |
| print("β Could not fetch models") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| def show_dataset_stats(self): | |
| """Show dataset statistics""" | |
| if not self.dataset: | |
| print("β No dataset loaded") | |
| return | |
| print(f"\nπ Dataset Statistics:") | |
| print(f"Total examples: {len(self.dataset)}") | |
| # Count by topic | |
| topics = {} | |
| for example in self.dataset: | |
| metadata = example.get('metadata', {}) | |
| topic = metadata.get('topic', 'unknown') | |
| topics[topic] = topics.get(topic, 0) + 1 | |
| print(f"Topics: {dict(topics)}") | |
| # Show sample questions | |
| print(f"\nπ Sample questions:") | |
| for i, example in enumerate(self.dataset[:5], 1): | |
| instruction = example.get('instruction', '') | |
| print(f"{i}. {instruction}") | |
| def main(): | |
| print("π Novita AI RAG Chat - Textilindo AI") | |
| print("=" * 60) | |
| print("This application uses your dataset as context with Novita AI models") | |
| print("No fine-tuning required - RAG approach!") | |
| print("=" * 60) | |
| # Check API key | |
| api_key = os.getenv('NOVITA_API_KEY') | |
| if not api_key: | |
| print("β NOVITA_API_KEY not found") | |
| api_key = input("Enter your Novita AI API key: ").strip() | |
| if not api_key: | |
| print("β API key required") | |
| return | |
| os.environ['NOVITA_API_KEY'] = api_key | |
| # Initialize RAG chat | |
| chat = NovitaAIRAGChat(api_key) | |
| # Test connection | |
| print("π Testing connection...") | |
| try: | |
| response = requests.get(f"{chat.base_url}/models", headers=chat.headers, timeout=10) | |
| if response.status_code != 200: | |
| print("β Could not connect to Novita AI") | |
| return | |
| except Exception as e: | |
| print(f"β Connection error: {e}") | |
| return | |
| print("β Connected to Novita AI!") | |
| # Show dataset stats | |
| chat.show_dataset_stats() | |
| # Show current model | |
| print(f"\nπ€ Current model: {chat.current_model}") | |
| # Main chat loop | |
| print("\n㪠Start chatting! Type 'help' for commands, 'quit' to exit") | |
| print("-" * 60) | |
| while True: | |
| try: | |
| user_input = input("\nπ€ You: ").strip() | |
| if not user_input: | |
| continue | |
| # Handle commands | |
| if user_input.lower() in ['quit', 'exit', 'q']: | |
| print("π Goodbye!") | |
| break | |
| elif user_input.lower() == 'help': | |
| print("\nπ Available Commands:") | |
| print(" help - Show this help") | |
| print(" models - Show available models") | |
| print(" change <id> - Change model (e.g., change 5)") | |
| print(" clear - Clear conversation history") | |
| print(" stats - Show dataset statistics") | |
| print(" quit/exit/q - Exit the application") | |
| print(" <any text> - Send message to AI (with RAG context)") | |
| continue | |
| elif user_input.lower() == 'models': | |
| chat.show_models() | |
| continue | |
| elif user_input.lower() == 'clear': | |
| chat.clear_history() | |
| continue | |
| elif user_input.lower() == 'stats': | |
| chat.show_dataset_stats() | |
| continue | |
| elif user_input.lower().startswith('system '): | |
| # Update system prompt/persona | |
| new_prompt = user_input[len('system '):].strip() | |
| chat.set_system_prompt(new_prompt) | |
| continue | |
| elif user_input.lower().startswith('change '): | |
| try: | |
| model_num = int(user_input.split()[1]) | |
| # This would need to be implemented to get model list | |
| print("β οΈ Model changing not implemented yet") | |
| except (ValueError, IndexError): | |
| print("β Usage: change <number>") | |
| continue | |
| # Send message to AI with RAG context | |
| response = chat.chat_completion(user_input) | |
| if response: | |
| print(f"\nπ€ Assistant: {response}") | |
| except KeyboardInterrupt: | |
| print("\nπ Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| if __name__ == "__main__": | |
| main() | |