#!/usr/bin/env python3 """ RAG News Viewer View and manage high-confidence news saved in Google Drive """ import json import os from datetime import datetime from rag_news_manager import initialize_rag_system, get_rag_stats, rag_manager def format_news_entry(entry, index): """Format a news entry for display""" created_date = datetime.fromisoformat(entry['created_at'].replace('Z', '+00:00')) formatted_date = created_date.strftime("%Y-%m-%d %H:%M:%S") prediction_emoji = "βœ…" if entry['prediction'] == 'REAL' else "❌" confidence_color = "🟒" if entry['gemini_confidence'] > 0.95 else "🟑" print(f"\n{'='*80}") print(f"πŸ“° ENTRY #{index} - {prediction_emoji} {entry['prediction']} {confidence_color}") print(f"{'='*80}") print(f"πŸ†” ID: {entry['id']}") print(f"πŸ“… Created: {formatted_date}") print(f"πŸ“Š Confidence: {entry['gemini_confidence']:.1%}") print(f"πŸ”— Hash: {entry['content_hash'][:12]}...") print(f"πŸ“ Source: {entry.get('source', 'Unknown')}") print(f"βœ… Verified: {entry.get('verified', False)}") if entry.get('distilbert_confidence'): print(f"πŸ€– DistilBERT: {entry['distilbert_confidence']:.1%}") print(f"\nπŸ“° NEWS TEXT:") print(f"{'-'*40}") print(entry['news_text']) print(f"\n🧠 GEMINI ANALYSIS:") print(f"{'-'*40}") print(entry['gemini_analysis']) if entry.get('search_results'): print(f"\nπŸ” SEARCH RESULTS ({len(entry['search_results'])} sources):") print(f"{'-'*40}") for i, result in enumerate(entry['search_results'][:3], 1): print(f"{i}. {result.get('title', 'No title')}") print(f" {result.get('snippet', 'No snippet')[:100]}...") print(f" πŸ”— {result.get('link', 'No link')}") return True def view_all_news(): """View all saved news entries""" print("πŸ“š VIEWING ALL RAG NEWS ENTRIES") print("=" * 60) try: data = rag_manager.load_rag_data() entries = data.get('news_entries', []) if not entries: print("πŸ“­ No news entries found in RAG system") return print(f"πŸ“Š Found {len(entries)} news entries") print(f"πŸ“… Last updated: {data.get('metadata', {}).get('last_updated', 'Unknown')}") # Sort by creation date (newest first) entries.sort(key=lambda x: x['created_at'], reverse=True) for i, entry in enumerate(entries, 1): format_news_entry(entry, i) if i < len(entries): input("\n⏸️ Press Enter to view next entry (or Ctrl+C to exit)...") print(f"\nβœ… Displayed all {len(entries)} entries") except KeyboardInterrupt: print("\n\nπŸ‘‹ Viewing interrupted by user") except Exception as e: print(f"❌ Error viewing news: {e}") def view_recent_news(limit=5): """View recent news entries""" print(f"πŸ“° VIEWING {limit} MOST RECENT NEWS ENTRIES") print("=" * 50) try: data = rag_manager.load_rag_data() entries = data.get('news_entries', []) if not entries: print("πŸ“­ No news entries found in RAG system") return # Sort by creation date (newest first) entries.sort(key=lambda x: x['created_at'], reverse=True) recent_entries = entries[:limit] print(f"πŸ“Š Showing {len(recent_entries)} most recent entries") for i, entry in enumerate(recent_entries, 1): format_news_entry(entry, i) if i < len(recent_entries): input("\n⏸️ Press Enter to view next entry (or Ctrl+C to exit)...") except KeyboardInterrupt: print("\n\nπŸ‘‹ Viewing interrupted by user") except Exception as e: print(f"❌ Error viewing recent news: {e}") def view_by_prediction(prediction): """View news entries by prediction type""" print(f"πŸ” VIEWING {prediction} NEWS ENTRIES") print("=" * 50) try: data = rag_manager.load_rag_data() entries = data.get('news_entries', []) # Filter by prediction filtered_entries = [entry for entry in entries if entry['prediction'] == prediction] if not filtered_entries: print(f"πŸ“­ No {prediction} news entries found") return print(f"πŸ“Š Found {len(filtered_entries)} {prediction} entries") # Sort by confidence (highest first) filtered_entries.sort(key=lambda x: x['gemini_confidence'], reverse=True) for i, entry in enumerate(filtered_entries, 1): format_news_entry(entry, i) if i < len(filtered_entries): input("\n⏸️ Press Enter to view next entry (or Ctrl+C to exit)...") except KeyboardInterrupt: print("\n\nπŸ‘‹ Viewing interrupted by user") except Exception as e: print(f"❌ Error viewing {prediction} news: {e}") def search_news(query): """Search news entries""" print(f"πŸ” SEARCHING FOR: '{query}'") print("=" * 50) try: results = rag_manager.search_rag_news(query, limit=10) if not results: print("πŸ“­ No matching news entries found") return print(f"πŸ“Š Found {len(results)} matching entries") for i, entry in enumerate(results, 1): format_news_entry(entry, i) if i < len(results): input("\n⏸️ Press Enter to view next entry (or Ctrl+C to exit)...") except KeyboardInterrupt: print("\n\nπŸ‘‹ Search interrupted by user") except Exception as e: print(f"❌ Error searching news: {e}") def show_statistics(): """Show RAG system statistics""" print("πŸ“Š RAG SYSTEM STATISTICS") print("=" * 40) try: stats = get_rag_stats() if not stats: print("❌ Could not retrieve statistics") return print(f"πŸ“ˆ Total Entries: {stats['total_entries']}") print(f"βœ… Real News: {stats['real_count']}") print(f"❌ Fake News: {stats['fake_count']}") print(f"πŸ“Š Average Confidence: {stats['avg_confidence']:.1%}") if stats['latest_entry']: latest = stats['latest_entry'] latest_date = datetime.fromisoformat(latest['created_at'].replace('Z', '+00:00')) print(f"πŸ•’ Latest Entry: {latest_date.strftime('%Y-%m-%d %H:%M:%S')}") print(f" πŸ“° {latest['news_text'][:80]}...") print(f" 🎯 {latest['prediction']} ({latest['gemini_confidence']:.1%})") print(f"\nπŸ”— Google Drive Links:") if stats['folder_id']: folder_url = f"https://drive.google.com/drive/folders/{stats['folder_id']}" print(f" πŸ“ RAG Folder: {folder_url}") if stats['file_id']: file_url = f"https://drive.google.com/file/d/{stats['file_id']}/view" print(f" πŸ“„ RAG File: {file_url}") except Exception as e: print(f"❌ Error getting statistics: {e}") def main_menu(): """Main menu for the viewer""" while True: print("\n" + "="*60) print("πŸ” RAG NEWS VIEWER - Vietnamese Fake News Detection") print("="*60) print("1. πŸ“Š View Statistics") print("2. πŸ“° View Recent News (5 entries)") print("3. πŸ“š View All News") print("4. βœ… View Real News Only") print("5. ❌ View Fake News Only") print("6. πŸ” Search News") print("7. πŸ”— Open Google Drive") print("8. ❌ Exit") print("="*60) try: choice = input("πŸ‘‰ Select option (1-8): ").strip() if choice == '1': show_statistics() elif choice == '2': view_recent_news(5) elif choice == '3': view_all_news() elif choice == '4': view_by_prediction('REAL') elif choice == '5': view_by_prediction('FAKE') elif choice == '6': query = input("πŸ” Enter search query: ").strip() if query: search_news(query) else: print("❌ Please enter a search query") elif choice == '7': stats = get_rag_stats() if stats and stats['folder_id']: folder_url = f"https://drive.google.com/drive/folders/{stats['folder_id']}" print(f"πŸ”— Opening Google Drive: {folder_url}") import webbrowser webbrowser.open(folder_url) else: print("❌ Google Drive folder not found") elif choice == '8': print("πŸ‘‹ Goodbye!") break else: print("❌ Invalid choice. Please select 1-8.") except KeyboardInterrupt: print("\n\nπŸ‘‹ Goodbye!") break except Exception as e: print(f"❌ Error: {e}") def main(): """Main function""" print("πŸš€ RAG News Viewer") print("=" * 30) # Initialize RAG system print("πŸ”§ Initializing RAG system...") if not initialize_rag_system(): print("❌ Failed to initialize RAG system") print("Please run setup_google_drive_rag.py first") return print("βœ… RAG system initialized successfully!") # Show initial statistics show_statistics() # Start main menu main_menu() if __name__ == "__main__": main()