import asyncio import aiohttp import gradio as gr import json import re import time from datetime import datetime from typing import List, Dict, Optional, Tuple from urllib.parse import quote_plus, urljoin from dataclasses import dataclass import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer import requests from bs4 import BeautifulSoup import newspaper from newspaper import Article import logging import warnings # Suppress warnings warnings.filterwarnings("ignore") logging.getLogger().setLevel(logging.ERROR) @dataclass class SearchResult: """Data class for search results""" title: str url: str snippet: str content: str = "" publication_date: Optional[str] = None relevance_score: float = 0.0 class QueryEnhancer: """Enhance user queries with search operators and entity quoting""" def __init__(self): # Common named entity patterns self.entity_patterns = [ r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', # Proper names r'\b[A-Z]{2,}(?:\s+[A-Z][a-z]+)*\b', # Acronyms + words r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(?:Inc|Corp|LLC|Ltd|Co|Company|Trust|Group|Holdings)\b' # Companies ] def enhance_query(self, query: str) -> str: """Enhance query by quoting named entities and adding operators""" enhanced = query # Find and quote named entities for pattern in self.entity_patterns: matches = re.findall(pattern, enhanced) for match in matches: if len(match.split()) > 1: # Only quote multi-word entities enhanced = enhanced.replace(match, f'"{match}"') return enhanced class SearchEngineInterface: """Interface for different search engines""" def __init__(self): self.session = None self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } async def get_session(self): """Get or create aiohttp session with better configuration""" if self.session is None or self.session.closed: connector = aiohttp.TCPConnector( limit=20, limit_per_host=5, ttl_dns_cache=300, use_dns_cache=True, keepalive_timeout=30, enable_cleanup_closed=True ) timeout = aiohttp.ClientTimeout(total=45, connect=15, sock_read=30) self.session = aiohttp.ClientSession( headers=self.headers, connector=connector, timeout=timeout, trust_env=True ) return self.session async def search_google(self, query: str, num_results: int = 10) -> List[SearchResult]: """Search Google and parse results""" try: session = await self.get_session() url = f"https://www.google.com/search?q={quote_plus(query)}&num={num_results}" async with session.get(url) as response: if response.status != 200: return [] html = await response.text() soup = BeautifulSoup(html, 'html.parser') results = [] # Parse Google search results for g in soup.find_all('div', class_='g')[:num_results]: try: title_elem = g.find('h3') if not title_elem: continue title = title_elem.get_text() # Get URL link_elem = g.find('a') if not link_elem or not link_elem.get('href'): continue url = link_elem['href'] # Get snippet snippet_elem = g.find('span', class_=['st', 'aCOpRe']) if not snippet_elem: snippet_elem = g.find('div', class_=['s', 'st']) snippet = snippet_elem.get_text() if snippet_elem else "" if title and url.startswith('http'): results.append(SearchResult(title=title, url=url, snippet=snippet)) except Exception as e: continue return results except Exception as e: print(f"Google search error: {e}") return [] async def search_bing(self, query: str, num_results: int = 10) -> List[SearchResult]: """Search Bing and parse results""" try: session = await self.get_session() url = f"https://www.bing.com/search?q={quote_plus(query)}&count={num_results}" async with session.get(url) as response: if response.status != 200: return [] html = await response.text() soup = BeautifulSoup(html, 'html.parser') results = [] # Parse Bing search results for result in soup.find_all('li', class_='b_algo')[:num_results]: try: title_elem = result.find('h2') if not title_elem: continue link_elem = title_elem.find('a') if not link_elem: continue title = link_elem.get_text() url = link_elem.get('href', '') snippet_elem = result.find('p', class_='b_paractl') or result.find('div', class_='b_caption') snippet = snippet_elem.get_text() if snippet_elem else "" if title and url.startswith('http'): results.append(SearchResult(title=title, url=url, snippet=snippet)) except Exception as e: continue return results except Exception as e: print(f"Bing search error: {e}") return [] async def search_yahoo(self, query: str, num_results: int = 10) -> List[SearchResult]: """Search Yahoo and parse results""" try: session = await self.get_session() url = f"https://search.yahoo.com/search?p={quote_plus(query)}&n={num_results}" async with session.get(url) as response: if response.status != 200: return [] html = await response.text() soup = BeautifulSoup(html, 'html.parser') results = [] # Parse Yahoo search results for result in soup.find_all('div', class_='dd')[:num_results]: try: title_elem = result.find('h3', class_='title') if not title_elem: continue link_elem = title_elem.find('a') if not link_elem: continue title = link_elem.get_text() url = link_elem.get('href', '') snippet_elem = result.find('div', class_='compText') snippet = snippet_elem.get_text() if snippet_elem else "" if title and url.startswith('http'): results.append(SearchResult(title=title, url=url, snippet=snippet)) except Exception as e: continue return results except Exception as e: print(f"Yahoo search error: {e}") return [] async def close(self): """Close the session safely""" if self.session and not self.session.closed: await self.session.close() # Wait a bit for the underlying connections to close await asyncio.sleep(0.1) class ContentScraper: """Scrape and parse article content using newspaper3k with robust error handling""" def __init__(self): self.session = None self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'cross-site', 'Sec-Fetch-User': '?1', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' } # Domains known to block scrapers - we'll handle these differently self.blocked_domains = { 'bloomberg.com', 'wsj.com', 'ft.com', 'nytimes.com', 'washingtonpost.com', 'economist.com', 'reuters.com' } async def get_session(self): """Get or create aiohttp session with robust configuration""" if self.session is None or self.session.closed: connector = aiohttp.TCPConnector( limit=30, limit_per_host=10, ttl_dns_cache=300, use_dns_cache=True, keepalive_timeout=60, enable_cleanup_closed=True, ssl=False # Disable SSL verification for problematic sites ) timeout = aiohttp.ClientTimeout(total=60, connect=20, sock_read=40) self.session = aiohttp.ClientSession( headers=self.headers, connector=connector, timeout=timeout, trust_env=True ) return self.session def is_blocked_domain(self, url: str) -> bool: """Check if domain is known to block scrapers""" from urllib.parse import urlparse try: domain = urlparse(url).netloc.lower() return any(blocked in domain for blocked in self.blocked_domains) except: return False async def scrape_article_fallback(self, url: str) -> Tuple[str, Optional[str]]: """Enhanced fallback scraping method using direct HTTP request""" try: session = await self.get_session() # Add random delay to avoid rate limiting await asyncio.sleep(0.2) async with session.get(url, allow_redirects=True) as response: if response.status != 200: return "", None html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Remove unwanted elements for unwanted in soup(["script", "style", "nav", "header", "footer", "aside", "iframe", "noscript"]): unwanted.decompose() # Try multiple content extraction strategies content = "" # Strategy 1: Look for common article content containers content_selectors = [ # Generic selectors 'article', '[role="main"]', 'main', '.main-content', '.content', # News-specific selectors '.story-body', '.article-body', '.entry-content', '.post-content', '.article-content', '.story-content', '.news-content', # Site-specific selectors '[data-module="ArticleBody"]', '.RichTextStoryBody', '.InlineVideo', '.zone-content', '.field-name-body', '.story-text', # CNN specific '.zn-body__paragraph', '.zn-body-text', # Fox News specific '.article-body', '.article-text', # NBC specific '.articleText', '.inline-story-content', # AP News specific '.Article', '.RichTextStoryBody', # BBC specific '[data-component="text-block"]', '.ssrcss-1q0x1qg-Paragraph', # Generic fallbacks '.text', '.body', '[class*="content"]', '[class*="article"]', '[class*="story"]' ] for selector in content_selectors: try: elements = soup.select(selector) if elements: texts = [] for elem in elements: text = elem.get_text(separator=' ', strip=True) if len(text) > 50: # Only meaningful content texts.append(text) if texts: content = ' '.join(texts) if len(content) > 200: # Good content found break except: continue # Strategy 2: If no structured content, get all paragraphs if not content or len(content) < 100: paragraphs = soup.find_all('p') p_texts = [] for p in paragraphs: text = p.get_text(strip=True) # Filter out short paragraphs, likely navigation/ads if len(text) > 30 and not any(skip in text.lower() for skip in ['cookie', 'advertisement', 'subscribe', 'newsletter', 'follow us', 'social media', 'share this']): p_texts.append(text) if p_texts: content = ' '.join(p_texts) # Strategy 3: Extract from divs with text content if not content or len(content) < 100: divs = soup.find_all('div') div_texts = [] for div in divs: # Only direct text, not nested text = div.get_text(separator=' ', strip=True) if 100 < len(text) < 1000: # Reasonable paragraph length # Check if it's likely article content if any(word in text.lower() for word in ['said', 'according', 'reported', 'stated', 'announced']): div_texts.append(text) if div_texts: content = ' '.join(div_texts[:3]) # Take first 3 relevant divs # Try to extract publication date pub_date = None date_selectors = [ 'time[datetime]', '[datetime]', '.published-date', '.post-date', '.article-date', '.timestamp', '.date', '.publish-date', '[data-testid="timestamp"]', '.byline-timestamp', '.story-date', '.news-date' ] for selector in date_selectors: try: date_elem = soup.select_one(selector) if date_elem: pub_date = (date_elem.get('datetime') or date_elem.get('content') or date_elem.get_text(strip=True)) if pub_date: break except: continue # Clean and limit content if content: # Remove excessive whitespace content = ' '.join(content.split()) # Limit length content = content[:3000] return content, pub_date except Exception as e: print(f"Enhanced fallback scraping failed for {url}: {str(e)[:100]}...") return "", None async def scrape_article(self, url: str) -> Tuple[str, Optional[str]]: """Scrape article content with multiple fallback strategies""" content = "" pub_date = None # Method 1: Try newspaper3k first (simple approach) try: article = Article(url) article.download() article.parse() if article.text and len(article.text.strip()) > 100: content = article.text.strip()[:3000] pub_date = article.publish_date.isoformat() if article.publish_date else None return content, pub_date except Exception as e: print(f"Newspaper3k failed for {url}: {str(e)[:100]}...") # Method 2: Fallback to direct HTTP scraping try: content, pub_date = await self.scrape_article_fallback(url) if content and len(content.strip()) > 50: return content, pub_date except Exception as e: print(f"Fallback scraping failed for {url}: {str(e)[:100]}...") # Method 3: Last resort - try to get at least the title/snippet try: session = await self.get_session() async with session.get(url, allow_redirects=True) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Get at least the title and meta description title = soup.find('title') title_text = title.get_text().strip() if title else "" meta_desc = soup.find('meta', attrs={'name': 'description'}) desc_text = meta_desc.get('content', '').strip() if meta_desc else "" if title_text or desc_text: content = f"{title_text}. {desc_text}".strip() return content, None except Exception as e: print(f"Last resort scraping failed for {url}: {str(e)[:100]}...") return "", None async def scrape_multiple(self, search_results: List[SearchResult], max_successful: int = None) -> List[SearchResult]: """Scrape multiple articles with robust error handling and retry logic""" if not search_results: return search_results max_successful = max_successful or len(search_results) successful_scraped = 0 semaphore = asyncio.Semaphore(5) # Limit concurrent requests async def scrape_with_semaphore(result: SearchResult) -> SearchResult: nonlocal successful_scraped if successful_scraped >= max_successful: return result async with semaphore: try: # Skip if already have enough successful results if successful_scraped >= max_successful: return result content, pub_date = await self.scrape_article(result.url) if content and len(content.strip()) > 50: result.content = content result.publication_date = pub_date successful_scraped += 1 print(f"āœ… Successfully scraped: {result.url[:60]}...") else: print(f"āš ļø No content extracted from: {result.url[:60]}...") except Exception as e: print(f"āŒ Failed to scrape {result.url[:60]}...: {e}") return result # Process all URLs but stop when we have enough successful results tasks = [] for result in search_results: if successful_scraped < max_successful: tasks.append(scrape_with_semaphore(result)) else: break if tasks: scraped_results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions and return successful results valid_results = [] for result in scraped_results: if not isinstance(result, Exception): valid_results.append(result) else: valid_results = search_results # Return results with content first, then others results_with_content = [r for r in valid_results if r.content.strip()] results_without_content = [r for r in valid_results if not r.content.strip()] print(f"šŸ“Š Scraping summary: {len(results_with_content)} successful, {len(results_without_content)} failed") return results_with_content + results_without_content async def close(self): """Close the session""" if self.session: await self.session.close() class EmbeddingFilter: """Filter search results using embedding-based similarity""" def __init__(self): self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') def filter_by_relevance(self, query: str, search_results: List[SearchResult], threshold: float = 0.1) -> List[SearchResult]: """Filter results by cosine similarity with query""" if not search_results: return search_results # Combine title, snippet, and content for each result result_texts = [] for result in search_results: combined_text = f"{result.title} {result.snippet} {result.content[:1000]}" result_texts.append(combined_text) if not result_texts: return search_results try: # Add query to the corpus for vectorization all_texts = [query] + result_texts # Vectorize texts tfidf_matrix = self.vectorizer.fit_transform(all_texts) # Calculate cosine similarity between query and each result query_vector = tfidf_matrix[0:1] result_vectors = tfidf_matrix[1:] similarities = cosine_similarity(query_vector, result_vectors)[0] # Add relevance scores and filter filtered_results = [] for i, result in enumerate(search_results): result.relevance_score = similarities[i] if similarities[i] >= threshold: filtered_results.append(result) # Sort by relevance score filtered_results.sort(key=lambda x: x.relevance_score, reverse=True) return filtered_results except Exception as e: print(f"Embedding filter error: {e}") return search_results class LLMSummarizer: """Improved summarizer with better content preparation and validation""" def __init__(self, groq_api_key: str = "", openrouter_api_key: str = ""): self.groq_api_key = groq_api_key self.openrouter_api_key = openrouter_api_key self.groq_model = "meta-llama/llama-4-maverick-17b-128e-instruct" self.openrouter_model = "deepseek/deepseek-r1:free" def create_system_prompt(self) -> str: """Create system prompt for summarization""" return """You are an expert research assistant. Your task is to analyze search results and provide a comprehensive, accurate summary that directly answers the user's query. CRITICAL INSTRUCTIONS: 1. Analyze ALL provided content carefully - even if it seems only tangentially related 2. Look for connections between the query and the content, even if not immediately obvious 3. If content is about a parent company/organization mentioned in the query, include relevant information 4. Extract and synthesize any information that could be relevant to answering the user's question 5. Include specific facts, dates, numbers, and quotes when present 6. If information is contradictory between sources, mention this 7. Cite sources by mentioning the publication or website name 8. Be thorough and detailed rather than dismissive ONLY state that results are not relevant if they are completely unrelated to any aspect of the query. If there is ANY connection (like parent company info, related business segments, etc.), include that information. Format your response as a comprehensive summary, not bullet points.""" def validate_content_quality(self, search_results: List[SearchResult], query: str) -> Tuple[List[SearchResult], str]: """Validate and filter content quality before summarization""" valid_results = [] validation_info = [] # More intelligent keyword extraction query_lower = query.lower() # Extract key entities and terms important_keywords = [] # Split query into words and extract meaningful terms words = query_lower.split() for word in words: if len(word) > 2 and word not in ['news', 'latest', 'recent', 'update', 'information', 'about']: important_keywords.append(word) # Also look for multi-word entities (like company names) # Extract potential company/entity names from query entity_patterns = [ r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', # Proper names r'\b[A-Z]{2,}(?:\s+[A-Z][a-z]+)*\b', # Acronyms ] for pattern in entity_patterns: matches = re.findall(pattern, query) for match in matches: important_keywords.extend(match.lower().split()) # Remove duplicates important_keywords = list(set(important_keywords)) for result in search_results: if not result.content or len(result.content.strip()) < 50: # Lowered threshold validation_info.append(f"Skipped '{result.title}' - insufficient content") continue # Check if content contains query-relevant terms content_lower = result.content.lower() title_lower = result.title.lower() snippet_lower = result.snippet.lower() combined_text = f"{title_lower} {snippet_lower} {content_lower}" # More flexible relevance scoring relevant_score = 0 matched_keywords = [] for keyword in important_keywords: if keyword in combined_text: if keyword in content_lower: relevant_score += 2 matched_keywords.append(keyword) elif keyword in title_lower: relevant_score += 3 # Title matches are very important matched_keywords.append(keyword) elif keyword in snippet_lower: relevant_score += 1 matched_keywords.append(keyword) # Special handling for acronyms and company names # If query contains a company acronym (like KKR), be more lenient has_company_match = any(len(kw) <= 4 and kw.isupper() for kw in query.split()) if has_company_match: relevant_score += 1 # Boost score for company-related queries # Lower the threshold and accept more results if relevant_score >= 1 or len(matched_keywords) >= 1: valid_results.append(result) validation_info.append(f"āœ“ '{result.title}' - score: {relevant_score}, matched: {matched_keywords}") else: validation_info.append(f"Skipped '{result.title}' - no relevant keywords found") # If we filtered out too many results, be more lenient if len(valid_results) < len(search_results) * 0.3: # If we filtered out more than 70% validation_info.append("āš ļø Too many results filtered, being more lenient...") # Add back results that have any content for result in search_results: if result not in valid_results and result.content.strip(): valid_results.append(result) validation_info.append(f"āœ“ '{result.title}' - added back (lenient mode)") validation_summary = "\n".join(validation_info) return valid_results, validation_summary def prepare_content_for_llm(self, query: str, search_results: List[SearchResult]) -> str: """Prepare well-structured content for LLM""" # Validate content first valid_results, validation_info = self.validate_content_quality(search_results, query) if not valid_results: return f"""Query: "{query}" VALIDATION RESULTS: {validation_info} The search results did not pass the initial relevance filter, but this might be overly restrictive. Please analyze the raw content provided and extract any information that could be relevant to answering the user's query, even if the connection is not immediately obvious.""" content_parts = [f'User Query: "{query}"\n'] content_parts.append(f"Number of relevant sources found: {len(valid_results)}\n") for i, result in enumerate(valid_results, 1): content_parts.append(f"=== SOURCE {i} ===") content_parts.append(f"Title: {result.title}") content_parts.append(f"URL: {result.url}") if result.publication_date: content_parts.append(f"Date: {result.publication_date}") if result.relevance_score > 0: content_parts.append(f"Relevance Score: {result.relevance_score:.3f}") # Include snippet if it's different from content start if result.snippet and not result.content.startswith(result.snippet[:50]): content_parts.append(f"Snippet: {result.snippet}") # Intelligently truncate content while preserving meaning content = result.content.strip() if len(content) > 3000: # Try to find a good breaking point truncate_at = 3000 # Look for sentence endings near the truncation point for i in range(2800, 3200): if i < len(content) and content[i] in '.!?': truncate_at = i + 1 break content = content[:truncate_at] + "... [content truncated]" content_parts.append(f"Content: {content}") content_parts.append("") # Empty line between sources return "\n".join(content_parts) async def summarize_with_groq(self, query: str, search_results: List[SearchResult], temperature: float = 0.3, max_tokens: int = 2000) -> str: """Improved Groq summarization with better content preparation""" if not self.groq_api_key: return "Groq API key not provided" try: # Prepare well-structured content prepared_content = self.prepare_content_for_llm(query, search_results) # Debug output print(f"DEBUG - Sending {len(prepared_content)} characters to Groq AI") print(f"DEBUG - Results with content: {len([r for r in search_results if r.content])}") print(f"DEBUG - First 300 chars: {prepared_content[:300]}...") user_prompt = f"""Please analyze the following search results and provide a comprehensive summary that directly answers the user's query. {prepared_content} Instructions: - Focus ONLY on information relevant to the query: "{query}" - If the results don't contain relevant information, explicitly state this - Be specific and factual, include dates/numbers when available - Mention source publications when referencing information - Don't provide generic advice if specific information isn't found""" headers = { "Authorization": f"Bearer {self.groq_api_key}", "Content-Type": "application/json" } payload = { "model": self.groq_model, "messages": [ {"role": "system", "content": self.create_system_prompt()}, {"role": "user", "content": user_prompt} ], "temperature": temperature, "max_tokens": max_tokens, "stream": False } async with aiohttp.ClientSession() as session: async with session.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json=payload) as response: if response.status == 200: result = await response.json() summary = result["choices"][0]["message"]["content"] # Add debug info in development debug_info = f"\n\n[DEBUG - Content Sources: {len([r for r in search_results if r.content])} with content, {len(search_results)} total]" return summary + debug_info else: error_text = await response.text() return f"Groq API error: {response.status} - {error_text}" except Exception as e: return f"Error with Groq summarization: {str(e)}" async def summarize_with_openrouter(self, query: str, search_results: List[SearchResult], temperature: float = 0.3, max_tokens: int = 2000) -> str: """Improved OpenRouter summarization with better content preparation""" if not self.openrouter_api_key: return "OpenRouter API key not provided" try: # Prepare well-structured content prepared_content = self.prepare_content_for_llm(query, search_results) # Debug output print(f"DEBUG - Sending {len(prepared_content)} characters to OpenRouter AI") print(f"DEBUG - Results with content: {len([r for r in search_results if r.content])}") print(f"DEBUG - First 300 chars: {prepared_content[:300]}...") user_prompt = f"""Please analyze the following search results and provide a comprehensive summary that directly answers the user's query. {prepared_content} Instructions: - Focus ONLY on information relevant to the query: "{query}" - If the results don't contain relevant information, explicitly state this - Be specific and factual, include dates/numbers when available - Mention source publications when referencing information - Don't provide generic advice if specific information isn't found""" headers = { "Authorization": f"Bearer {self.openrouter_api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://huggingface.co/spaces", "X-Title": "AI Search Engine" } payload = { "model": self.openrouter_model, "messages": [ {"role": "system", "content": self.create_system_prompt()}, {"role": "user", "content": user_prompt} ], "temperature": temperature, "max_tokens": max_tokens } async with aiohttp.ClientSession() as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload) as response: if response.status == 200: result = await response.json() summary = result["choices"][0]["message"]["content"] # Add debug info in development debug_info = f"\n\n[DEBUG - Content Sources: {len([r for r in search_results if r.content])} with content, {len(search_results)} total]" return summary + debug_info else: error_text = await response.text() return f"OpenRouter API error: {response.status} - {error_text}" except Exception as e: return f"Error with OpenRouter summarization: {str(e)}" class AISearchEngine: """Main AI-powered search engine class""" def __init__(self, groq_api_key: str = "", openrouter_api_key: str = ""): self.query_enhancer = QueryEnhancer() self.search_interface = SearchEngineInterface() self.content_scraper = ContentScraper() self.embedding_filter = EmbeddingFilter() self.llm_summarizer = LLMSummarizer(groq_api_key, openrouter_api_key) async def search_and_summarize(self, query: str, search_engines: List[str], model: str, use_embeddings: bool, temperature: float, max_results: int, max_tokens: int) -> Tuple[str, str]: """Main search and summarization pipeline with robust error handling""" start_time = time.time() status_updates = [] try: # Step 1: Query Enhancement status_updates.append("šŸ” Enhancing search query...") enhanced_query = self.query_enhancer.enhance_query(query) status_updates.append(f"Enhanced query: {enhanced_query}") # Step 2: Parallel Search across engines status_updates.append("🌐 Searching across multiple engines...") search_tasks = [] if "Google" in search_engines: search_tasks.append(self.search_interface.search_google(enhanced_query, max_results)) if "Bing" in search_engines: search_tasks.append(self.search_interface.search_bing(enhanced_query, max_results)) if "Yahoo" in search_engines: search_tasks.append(self.search_interface.search_yahoo(enhanced_query, max_results)) if not search_tasks: return "No search engines selected", "\n".join(status_updates) search_results_lists = await asyncio.gather(*search_tasks, return_exceptions=True) # Combine and deduplicate results, handling exceptions all_results = [] seen_urls = set() for results_list in search_results_lists: if not isinstance(results_list, Exception) and results_list: for result in results_list: if result.url not in seen_urls and result.url.startswith('http'): all_results.append(result) seen_urls.add(result.url) status_updates.append(f"Found {len(all_results)} unique results") if not all_results: return "No search results found. This might be due to rate limiting or network issues. Please try again.", "\n".join(status_updates) # Step 3: Content Scraping with intelligent retry and fallback status_updates.append("šŸ“„ Scraping article content...") # Prioritize results and scrape intelligently target_successful = min(max_results, len(all_results)) scraped_results = await self.content_scraper.scrape_multiple( all_results[:max_results * 2], # Try more URLs to ensure we get enough content max_successful=target_successful ) # Filter results with meaningful content results_with_content = [r for r in scraped_results if r.content.strip() and len(r.content.strip()) > 100] status_updates.append(f"Successfully scraped {len(results_with_content)} articles with meaningful content") # Debug: Show what content we actually got for i, result in enumerate(results_with_content[:3]): print(f"Result {i+1}: {result.title}") print(f"Content length: {len(result.content)}") print(f"Content preview: {result.content[:200]}...") print("---") # If we don't have enough content, try to get some from snippets if len(results_with_content) < 3: status_updates.append("Using search snippets as fallback content...") for result in scraped_results: if not result.content.strip() and result.snippet.strip(): result.content = result.snippet results_with_content.append(result) if len(results_with_content) >= 5: # Reasonable minimum break if not results_with_content: return "No article content could be extracted. This might be due to anti-bot protections. Please try a different query or try again later.", "\n".join(status_updates) # Step 4: Optional Embedding-based Filtering if use_embeddings and results_with_content: status_updates.append("🧠 Filtering results using embeddings...") try: filtered_results = self.embedding_filter.filter_by_relevance(query, results_with_content) if filtered_results: results_with_content = filtered_results status_updates.append(f"Filtered to {len(filtered_results)} most relevant results") else: status_updates.append("Embedding filter returned no results, using all scraped content") except Exception as e: status_updates.append(f"Embedding filtering failed, using all results: {str(e)}") if not results_with_content: return "No relevant results found after filtering", "\n".join(status_updates) # Step 5: LLM Summarization status_updates.append(f"šŸ¤– Generating summary using {model}...") try: if model.startswith("Groq"): summary = await self.llm_summarizer.summarize_with_groq( query, results_with_content, temperature, max_tokens ) else: # OpenRouter summary = await self.llm_summarizer.summarize_with_openrouter( query, results_with_content, temperature, max_tokens ) # Check if summarization failed if summary.startswith("Error") or summary.startswith("Groq API error") or summary.startswith("OpenRouter API error"): # Provide a basic summary from the content basic_summary = self.create_basic_summary(query, results_with_content) summary = f"AI summarization failed, but here's what I found:\n\n{basic_summary}\n\n---\nāš ļø Original error: {summary}" except Exception as e: # Fallback to basic summary basic_summary = self.create_basic_summary(query, results_with_content) summary = f"AI summarization encountered an error, but here's what I found:\n\n{basic_summary}\n\n---\nāš ļø Error: {str(e)}" # Add metadata end_time = time.time() processing_time = end_time - start_time metadata = f"\n\n---\n**Search Metadata:**\n" metadata += f"- Processing time: {processing_time:.2f} seconds\n" metadata += f"- Results found: {len(all_results)}\n" metadata += f"- Articles scraped: {len(results_with_content)}\n" metadata += f"- Search engines: {', '.join(search_engines)}\n" metadata += f"- Model: {model}\n" metadata += f"- Embeddings used: {use_embeddings}\n" final_summary = summary + metadata status_updates.append(f"āœ… Summary generated in {processing_time:.2f}s") return final_summary, "\n".join(status_updates) except Exception as e: error_msg = f"Error in search pipeline: {str(e)}" status_updates.append(f"āŒ {error_msg}") return error_msg, "\n".join(status_updates) finally: # Cleanup - but don't close sessions immediately to allow reuse try: # Don't close sessions here as they might be reused pass except Exception as e: print(f"Cleanup error: {e}") def create_basic_summary(self, query: str, results: List[SearchResult]) -> str: """Create a basic summary when AI summarization fails""" summary_parts = [f"Based on search results for: **{query}**\n"] for i, result in enumerate(results[:5], 1): content_preview = result.content[:300] + "..." if len(result.content) > 300 else result.content summary_parts.append(f"**{i}. {result.title}**") summary_parts.append(f"Source: {result.url}") if result.publication_date: summary_parts.append(f"Date: {result.publication_date}") summary_parts.append(f"Content: {content_preview}") summary_parts.append("") return "\n".join(summary_parts) # Global search engine instance search_engine = None async def initialize_search_engine(groq_key: str, openrouter_key: str): """Initialize the search engine with API keys""" global search_engine search_engine = AISearchEngine(groq_key, openrouter_key) return search_engine async def perform_search(query: str, search_engines: List[str], model: str, use_embeddings: bool, temperature: float, max_results: int, max_tokens: int, groq_key: str, openrouter_key: str): """Perform search with given parameters""" global search_engine if search_engine is None: search_engine = await initialize_search_engine(groq_key, openrouter_key) return await search_engine.search_and_summarize( query, search_engines, model, use_embeddings, temperature, max_results, max_tokens ) async def chat_inference(message, history, groq_key, openrouter_key, model_choice, search_engines, use_embeddings, temperature, max_results, max_tokens): """Main chat inference function for ChatInterface with additional inputs""" try: if not message.strip(): yield "Please enter a search query." return if not groq_key and not openrouter_key: yield "āŒ Please provide at least one API key (Groq or OpenRouter) to use the AI summarization features." return if not search_engines: yield "āŒ Please select at least one search engine." return # Initialize search engine global search_engine if search_engine is None: search_engine = await initialize_search_engine(groq_key, openrouter_key) else: # Update API keys if they changed search_engine.llm_summarizer.groq_api_key = groq_key search_engine.llm_summarizer.openrouter_api_key = openrouter_key # Start with status updates yield "šŸ” Enhancing query and searching across multiple engines..." # Small delay to show the initial status await asyncio.sleep(0.1) # Update status yield "🌐 Fetching results from search engines..." await asyncio.sleep(0.1) # Update status yield "šŸ“„ Scraping article content..." await asyncio.sleep(0.1) if use_embeddings: yield "🧠 Filtering results using embeddings..." await asyncio.sleep(0.1) yield "šŸ¤– Generating AI-powered summary..." await asyncio.sleep(0.1) # Perform the actual search and summarization summary, status = await search_engine.search_and_summarize( message, search_engines, model_choice, use_embeddings, temperature, max_results, max_tokens ) # Stream the final result yield summary except Exception as e: yield f"āŒ Search failed: {str(e)}\n\nPlease check your API keys and try again." def create_gradio_interface(): """Create the modern Gradio ChatInterface""" # Define additional inputs for the accordion additional_inputs = [ gr.Textbox( label="šŸ”‘ Groq API Key", type="password", placeholder="Enter your Groq API key (get from: https://console.groq.com/)", info="Required for Groq Llama-4 model" ), gr.Textbox( label="šŸ”‘ OpenRouter API Key", type="password", placeholder="Enter your OpenRouter API key (get from: https://openrouter.ai/)", info="Required for OpenRouter DeepSeek-R1 model" ), gr.Dropdown( choices=["Groq (Llama-4)", "OpenRouter (DeepSeek-R1)"], value="Groq (Llama-4)", label="šŸ¤– AI Model", info="Choose the AI model for summarization" ), gr.CheckboxGroup( choices=["Google", "Bing", "Yahoo"], value=["Google", "Bing"], label="šŸ” Search Engines", info="Select which search engines to use (multiple recommended)" ), gr.Checkbox( value=True, label="🧠 Use Embedding-based Filtering", info="Filter results by relevance using TF-IDF similarity (recommended)" ), gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.1, label="šŸŒ”ļø Temperature", info="Higher = more creative, Lower = more focused (0.1-0.3 recommended for factual queries)" ), gr.Slider( minimum=5, maximum=20, value=10, step=1, label="šŸ“Š Max Results per Engine", info="Number of search results to fetch from each engine" ), gr.Slider( minimum=500, maximum=4000, value=2000, step=100, label="šŸ“ Max Tokens", info="Maximum length of the AI-generated summary" ) ] # Create the main ChatInterface chat_interface = gr.ChatInterface( fn=chat_inference, additional_inputs=additional_inputs, additional_inputs_accordion=gr.Accordion("āš™ļø Configuration & Advanced Parameters", open=True), title="šŸ” AI-Powered Search Engine", description=""" **Search across Google, Bing, and Yahoo, then get AI-powered summaries!** ✨ **Features:** Multi-engine search • Query enhancement • Parallel scraping • AI summarization • Embedding filtering šŸ“‹ **Quick Start:** 1) Add your API key below 2) Select search engines 3) Ask any question! """, cache_examples=False, #retry_btn="šŸ”„ Retry", #undo_btn="ā†©ļø Undo", #clear_btn="šŸ—‘ļø Clear", submit_btn="šŸ” Search & Summarize", stop_btn="ā¹ļø Stop", chatbot=gr.Chatbot( show_copy_button=True, #likeable=True, layout="bubble", height=600, placeholder="šŸš€ Ready to search! Configure your settings below and ask me anything.", show_share_button=True ), theme=gr.themes.Soft(), analytics_enabled=False, type="messages" # Use the modern message format ) return chat_interface if __name__ == "__main__": demo = create_gradio_interface() demo.launch(share=True)