Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| RAG News Viewer | |
| View and manage high-confidence news saved in Google Drive | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from rag_news_manager import initialize_rag_system, get_rag_stats, rag_manager | |
| def format_news_entry(entry, index): | |
| """Format a news entry for display""" | |
| created_date = datetime.fromisoformat(entry['created_at'].replace('Z', '+00:00')) | |
| formatted_date = created_date.strftime("%Y-%m-%d %H:%M:%S") | |
| prediction_emoji = "β " if entry['prediction'] == 'REAL' else "β" | |
| confidence_color = "π’" if entry['gemini_confidence'] > 0.95 else "π‘" | |
| print(f"\n{'='*80}") | |
| print(f"π° ENTRY #{index} - {prediction_emoji} {entry['prediction']} {confidence_color}") | |
| print(f"{'='*80}") | |
| print(f"π ID: {entry['id']}") | |
| print(f"π Created: {formatted_date}") | |
| print(f"π Confidence: {entry['gemini_confidence']:.1%}") | |
| print(f"π Hash: {entry['content_hash'][:12]}...") | |
| print(f"π Source: {entry.get('source', 'Unknown')}") | |
| print(f"β Verified: {entry.get('verified', False)}") | |
| if entry.get('distilbert_confidence'): | |
| print(f"π€ DistilBERT: {entry['distilbert_confidence']:.1%}") | |
| print(f"\nπ° NEWS TEXT:") | |
| print(f"{'-'*40}") | |
| print(entry['news_text']) | |
| print(f"\nπ§ GEMINI ANALYSIS:") | |
| print(f"{'-'*40}") | |
| print(entry['gemini_analysis']) | |
| if entry.get('search_results'): | |
| print(f"\nπ SEARCH RESULTS ({len(entry['search_results'])} sources):") | |
| print(f"{'-'*40}") | |
| for i, result in enumerate(entry['search_results'][:3], 1): | |
| print(f"{i}. {result.get('title', 'No title')}") | |
| print(f" {result.get('snippet', 'No snippet')[:100]}...") | |
| print(f" π {result.get('link', 'No link')}") | |
| return True | |
| def view_all_news(): | |
| """View all saved news entries""" | |
| print("π VIEWING ALL RAG NEWS ENTRIES") | |
| print("=" * 60) | |
| try: | |
| data = rag_manager.load_rag_data() | |
| entries = data.get('news_entries', []) | |
| if not entries: | |
| print("π No news entries found in RAG system") | |
| return | |
| print(f"π Found {len(entries)} news entries") | |
| print(f"π Last updated: {data.get('metadata', {}).get('last_updated', 'Unknown')}") | |
| # Sort by creation date (newest first) | |
| entries.sort(key=lambda x: x['created_at'], reverse=True) | |
| for i, entry in enumerate(entries, 1): | |
| format_news_entry(entry, i) | |
| if i < len(entries): | |
| input("\nβΈοΈ Press Enter to view next entry (or Ctrl+C to exit)...") | |
| print(f"\nβ Displayed all {len(entries)} entries") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Viewing interrupted by user") | |
| except Exception as e: | |
| print(f"β Error viewing news: {e}") | |
| def view_recent_news(limit=5): | |
| """View recent news entries""" | |
| print(f"π° VIEWING {limit} MOST RECENT NEWS ENTRIES") | |
| print("=" * 50) | |
| try: | |
| data = rag_manager.load_rag_data() | |
| entries = data.get('news_entries', []) | |
| if not entries: | |
| print("π No news entries found in RAG system") | |
| return | |
| # Sort by creation date (newest first) | |
| entries.sort(key=lambda x: x['created_at'], reverse=True) | |
| recent_entries = entries[:limit] | |
| print(f"π Showing {len(recent_entries)} most recent entries") | |
| for i, entry in enumerate(recent_entries, 1): | |
| format_news_entry(entry, i) | |
| if i < len(recent_entries): | |
| input("\nβΈοΈ Press Enter to view next entry (or Ctrl+C to exit)...") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Viewing interrupted by user") | |
| except Exception as e: | |
| print(f"β Error viewing recent news: {e}") | |
| def view_by_prediction(prediction): | |
| """View news entries by prediction type""" | |
| print(f"π VIEWING {prediction} NEWS ENTRIES") | |
| print("=" * 50) | |
| try: | |
| data = rag_manager.load_rag_data() | |
| entries = data.get('news_entries', []) | |
| # Filter by prediction | |
| filtered_entries = [entry for entry in entries if entry['prediction'] == prediction] | |
| if not filtered_entries: | |
| print(f"π No {prediction} news entries found") | |
| return | |
| print(f"π Found {len(filtered_entries)} {prediction} entries") | |
| # Sort by confidence (highest first) | |
| filtered_entries.sort(key=lambda x: x['gemini_confidence'], reverse=True) | |
| for i, entry in enumerate(filtered_entries, 1): | |
| format_news_entry(entry, i) | |
| if i < len(filtered_entries): | |
| input("\nβΈοΈ Press Enter to view next entry (or Ctrl+C to exit)...") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Viewing interrupted by user") | |
| except Exception as e: | |
| print(f"β Error viewing {prediction} news: {e}") | |
| def search_news(query): | |
| """Search news entries""" | |
| print(f"π SEARCHING FOR: '{query}'") | |
| print("=" * 50) | |
| try: | |
| results = rag_manager.search_rag_news(query, limit=10) | |
| if not results: | |
| print("π No matching news entries found") | |
| return | |
| print(f"π Found {len(results)} matching entries") | |
| for i, entry in enumerate(results, 1): | |
| format_news_entry(entry, i) | |
| if i < len(results): | |
| input("\nβΈοΈ Press Enter to view next entry (or Ctrl+C to exit)...") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Search interrupted by user") | |
| except Exception as e: | |
| print(f"β Error searching news: {e}") | |
| def show_statistics(): | |
| """Show RAG system statistics""" | |
| print("π RAG SYSTEM STATISTICS") | |
| print("=" * 40) | |
| try: | |
| stats = get_rag_stats() | |
| if not stats: | |
| print("β Could not retrieve statistics") | |
| return | |
| print(f"π Total Entries: {stats['total_entries']}") | |
| print(f"β Real News: {stats['real_count']}") | |
| print(f"β Fake News: {stats['fake_count']}") | |
| print(f"π Average Confidence: {stats['avg_confidence']:.1%}") | |
| if stats['latest_entry']: | |
| latest = stats['latest_entry'] | |
| latest_date = datetime.fromisoformat(latest['created_at'].replace('Z', '+00:00')) | |
| print(f"π Latest Entry: {latest_date.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f" π° {latest['news_text'][:80]}...") | |
| print(f" π― {latest['prediction']} ({latest['gemini_confidence']:.1%})") | |
| print(f"\nπ Google Drive Links:") | |
| if stats['folder_id']: | |
| folder_url = f"https://drive.google.com/drive/folders/{stats['folder_id']}" | |
| print(f" π RAG Folder: {folder_url}") | |
| if stats['file_id']: | |
| file_url = f"https://drive.google.com/file/d/{stats['file_id']}/view" | |
| print(f" π RAG File: {file_url}") | |
| except Exception as e: | |
| print(f"β Error getting statistics: {e}") | |
| def main_menu(): | |
| """Main menu for the viewer""" | |
| while True: | |
| print("\n" + "="*60) | |
| print("π RAG NEWS VIEWER - Vietnamese Fake News Detection") | |
| print("="*60) | |
| print("1. π View Statistics") | |
| print("2. π° View Recent News (5 entries)") | |
| print("3. π View All News") | |
| print("4. β View Real News Only") | |
| print("5. β View Fake News Only") | |
| print("6. π Search News") | |
| print("7. π Open Google Drive") | |
| print("8. β Exit") | |
| print("="*60) | |
| try: | |
| choice = input("π Select option (1-8): ").strip() | |
| if choice == '1': | |
| show_statistics() | |
| elif choice == '2': | |
| view_recent_news(5) | |
| elif choice == '3': | |
| view_all_news() | |
| elif choice == '4': | |
| view_by_prediction('REAL') | |
| elif choice == '5': | |
| view_by_prediction('FAKE') | |
| elif choice == '6': | |
| query = input("π Enter search query: ").strip() | |
| if query: | |
| search_news(query) | |
| else: | |
| print("β Please enter a search query") | |
| elif choice == '7': | |
| stats = get_rag_stats() | |
| if stats and stats['folder_id']: | |
| folder_url = f"https://drive.google.com/drive/folders/{stats['folder_id']}" | |
| print(f"π Opening Google Drive: {folder_url}") | |
| import webbrowser | |
| webbrowser.open(folder_url) | |
| else: | |
| print("β Google Drive folder not found") | |
| elif choice == '8': | |
| print("π Goodbye!") | |
| break | |
| else: | |
| print("β Invalid choice. Please select 1-8.") | |
| except KeyboardInterrupt: | |
| print("\n\nπ Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| def main(): | |
| """Main function""" | |
| print("π RAG News Viewer") | |
| print("=" * 30) | |
| # Initialize RAG system | |
| print("π§ Initializing RAG system...") | |
| if not initialize_rag_system(): | |
| print("β Failed to initialize RAG system") | |
| print("Please run setup_google_drive_rag.py first") | |
| return | |
| print("β RAG system initialized successfully!") | |
| # Show initial statistics | |
| show_statistics() | |
| # Start main menu | |
| main_menu() | |
| if __name__ == "__main__": | |
| main() | |