| from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
|
| from langchain.utilities import WikipediaAPIWrapper
|
| from PIL import Image
|
| import re
|
| import time
|
| import json
|
| import pandas as pd
|
| from pathlib import Path
|
| from typing import List, Dict, Optional, Union
|
| from tabulate import tabulate
|
| import whisper
|
|
|
| import numpy as np
|
| import os
|
| from langchain_groq import ChatGroq
|
| from youtube_transcript_api import YouTubeTranscriptApi
|
| import re
|
|
|
| from langchain_openai import ChatOpenAI
|
| llm=ChatOpenAI(model='gpt-4o', temperature=0)
|
|
|
|
|
| class EnhancedSearchTool:
|
| """Enhanced web search with intelligent query processing and result filtering"""
|
|
|
| def __init__(self, max_results: int = 10):
|
| self.base_tool = DuckDuckGoSearchResults(num_results=max_results)
|
| self.max_results = max_results
|
|
|
| def _extract_key_terms(self, question: str) -> List[str]:
|
| """Extract key search terms from the question using LLM"""
|
| try:
|
| extract_prompt = f"""
|
| Extract the most important search terms from this question for web search:
|
| Question: {question}
|
|
|
| Return ONLY a comma-separated list of key terms, no explanations.
|
| Focus on: proper nouns, specific concepts, technical terms, dates, numbers.
|
| Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'.
|
|
|
| Example: "What is the population of Tokyo in 2023?" -> "Tokyo population 2023"
|
| """
|
|
|
| response = llm.invoke(extract_prompt).content.strip()
|
| return [term.strip() for term in response.split(',')]
|
| except Exception:
|
|
|
| return self._simple_keyword_extraction(question)
|
|
|
| def _simple_keyword_extraction(self, question: str) -> List[str]:
|
| """Fallback keyword extraction using regex"""
|
|
|
| stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'}
|
| words = re.findall(r'\b[A-Za-z]+\b', question.lower())
|
| return [word for word in words if word not in stop_words and len(word) > 2]
|
|
|
| def _generate_search_queries(self, question: str) -> List[str]:
|
| """Generate multiple search queries for comprehensive results"""
|
| key_terms = self._extract_key_terms(question)
|
|
|
| queries = []
|
|
|
|
|
| cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
|
| queries.append(cleaned_question)
|
|
|
|
|
| if key_terms:
|
| queries.append(' '.join(key_terms[:5]))
|
|
|
|
|
| if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']):
|
| queries.append(f"{' '.join(key_terms[:3])} 2024 2025")
|
|
|
| if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']):
|
| queries.append(f"{' '.join(key_terms[:3])} statistics data")
|
|
|
| if any(word in question.lower() for word in ['definition', 'what is', 'meaning']):
|
| queries.append(f"{' '.join(key_terms[:2])} definition meaning")
|
|
|
| return list(dict.fromkeys(queries))
|
|
|
| def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]:
|
| """Filter and rank search results based on relevance"""
|
| if not results:
|
| return results
|
|
|
| key_terms = self._extract_key_terms(question)
|
| key_terms_lower = [term.lower() for term in key_terms]
|
|
|
| scored_results = []
|
| for result in results:
|
| score = 0
|
| text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower()
|
|
|
|
|
| for term in key_terms_lower:
|
| if term in text_content:
|
| score += text_content.count(term)
|
|
|
|
|
| if any(year in text_content for year in ['2024', '2025', '2023']):
|
| score += 2
|
|
|
|
|
| if len(result.get('snippet', '')) < 50:
|
| score -= 1
|
|
|
| scored_results.append((score, result))
|
|
|
|
|
| scored_results.sort(key=lambda x: x[0], reverse=True)
|
| return [result for score, result in scored_results[:self.max_results]]
|
|
|
| def run(self, question: str) -> str:
|
| """Enhanced search execution with multiple queries and result filtering"""
|
| try:
|
| search_queries = self._generate_search_queries(question)
|
| all_results = []
|
|
|
| for query in search_queries[:3]:
|
| try:
|
| results = self.base_tool.run(query)
|
| if isinstance(results, str):
|
|
|
| try:
|
| results = json.loads(results) if results.startswith('[') else [{'snippet': results, 'title': 'Search Result'}]
|
| except:
|
| results = [{'snippet': results, 'title': 'Search Result'}]
|
|
|
| if isinstance(results, list):
|
| all_results.extend(results)
|
|
|
| time.sleep(0.5)
|
| except Exception as e:
|
| print(f"Search query failed: {query} - {e}")
|
| continue
|
|
|
| if not all_results:
|
| return "No search results found."
|
|
|
|
|
| filtered_results = self._filter_and_rank_results(all_results, question)
|
|
|
|
|
| formatted_results = []
|
| for i, result in enumerate(filtered_results[:5], 1):
|
| title = result.get('title', 'No title')
|
| snippet = result.get('snippet', 'No description')
|
| link = result.get('link', '')
|
|
|
| formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n")
|
|
|
| return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results)
|
|
|
| except Exception as e:
|
| return f"Enhanced search error: {str(e)}"
|
|
|
|
|
|
|
| class EnhancedWikipediaTool:
|
| """Enhanced Wikipedia search with intelligent query processing and content extraction"""
|
|
|
| def __init__(self):
|
| self.base_wrapper = WikipediaAPIWrapper(
|
| top_k_results=3,
|
| doc_content_chars_max=3000,
|
| load_all_available_meta=True
|
| )
|
| self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper)
|
|
|
| def _extract_entities(self, question: str) -> List[str]:
|
| """Extract named entities for Wikipedia search"""
|
| try:
|
| entity_prompt = f"""
|
| Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search:
|
| Question: {question}
|
|
|
| Return ONLY a comma-separated list of the most important entities.
|
| Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts.
|
|
|
| Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity, relativity"
|
| """
|
| response = llm.invoke(entity_prompt).content.strip()
|
| print(f'inside extract_entities:{response}')
|
| entities = [entity.strip() for entity in response.split(',')]
|
| return [e for e in entities if len(e) > 2]
|
| except Exception:
|
|
|
| return self._extract_capitalized_terms(question)
|
|
|
| def _extract_capitalized_terms(self, question: str) -> List[str]:
|
| """Fallback: extract capitalized terms as potential entities"""
|
|
|
| capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question)
|
|
|
| quoted_terms = re.findall(r'"([^"]+)"', question)
|
| quoted_terms.extend(re.findall(r"'([^']+)'", question))
|
|
|
| return capitalized_words + quoted_terms
|
|
|
| def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]:
|
| """Search Wikipedia for multiple entities and return best results"""
|
| results = {}
|
|
|
| for entity in entities[:3]:
|
| try:
|
| result = self.base_tool.run(entity)
|
| print(f'Inside _search_multiple_terms: {result}')
|
| if result and "Page:" in result and len(result) > 100:
|
| results[entity] = result
|
| time.sleep(0.5)
|
| except Exception as e:
|
| print(f"Wikipedia search failed for '{entity}': {e}")
|
| continue
|
|
|
| return results
|
|
|
| def _extract_relevant_sections(self, content: str, question: str) -> str:
|
| """Extract the most relevant sections from Wikipedia content"""
|
| if not content or len(content) < 200:
|
| return content
|
|
|
|
|
| sections = re.split(r'\n\s*\n', content)
|
| print(f'Inside _extract relevant sections:{sections}')
|
|
|
|
|
| key_terms = self._extract_entities(question)
|
| key_terms_lower = [term.lower() for term in key_terms]
|
|
|
| scored_sections = []
|
| for section in sections:
|
| if len(section.strip()) < 500:
|
| continue
|
|
|
| score = 0
|
| section_lower = section.lower()
|
|
|
|
|
| for term in key_terms_lower:
|
| score += section_lower.count(term)
|
|
|
|
|
| if re.search(r'\b(19|20)\d{2}\b', section):
|
| score += 1
|
| if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section):
|
| score += 1
|
|
|
| scored_sections.append((score, section))
|
|
|
|
|
| scored_sections.sort(key=lambda x: x[0], reverse=True)
|
| top_sections = [section for score, section in scored_sections[:7] if score > 0]
|
| print(f'Inside extract relevant sections, top sections:{top_sections}')
|
|
|
| if not top_sections:
|
|
|
| top_sections = sections[:2]
|
|
|
| return '\n\n'.join(top_sections)
|
|
|
| def run(self, question: str) -> str:
|
| """Enhanced Wikipedia search with entity extraction and content filtering"""
|
| try:
|
| entities = self._extract_entities(question)
|
|
|
| if not entities:
|
|
|
| cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
|
| try:
|
| result = self.base_tool.run(cleaned_question)
|
| print(f'******************Inside run*************:{result} ')
|
| return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found."
|
| except Exception as e:
|
| return f"Wikipedia search error: {str(e)}"
|
|
|
|
|
| search_results = self._search_multiple_terms(entities)
|
|
|
| if not search_results:
|
| return "No relevant Wikipedia articles found."
|
|
|
|
|
| formatted_results = []
|
| for entity, content in search_results.items():
|
| relevant_content = self._extract_relevant_sections(content, question)
|
| if relevant_content:
|
| formatted_results.append(f"=== {entity} ===\n{relevant_content}")
|
|
|
| if not formatted_results:
|
| return "No relevant information found in Wikipedia articles."
|
|
|
| return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results)
|
|
|
| except Exception as e:
|
| return f"Enhanced Wikipedia error: {str(e)}"
|
|
|
|
|
| def excel_to_markdown(excel_path: str, sheet_name: Optional[str] = None) -> str:
|
| """Enhanced Excel tool with better error handling and data analysis"""
|
| try:
|
| file_path = Path(excel_path).expanduser().resolve()
|
| if not file_path.is_file():
|
| return f"Error: Excel file not found at {file_path}"
|
|
|
| sheet: Union[str, int] = (
|
| int(sheet_name) if sheet_name and sheet_name.isdigit() else sheet_name or 0
|
| )
|
| df = pd.read_excel(file_path, sheet_name=sheet)
|
| df = df.iloc[:, :-1]
|
|
|
|
|
| metadata = f"EXCEL FILE ANALYSIS:\n"
|
| metadata += f"File: {file_path.name}\n"
|
| metadata += f"Dimensions: {len(df)} rows × {len(df.columns)} columns\n"
|
| metadata += f"Columns: {', '.join(df.columns.tolist())}\n"
|
| metadata += f"Data types: {dict(df.dtypes)}\n"
|
|
|
|
|
| numeric_cols = df.select_dtypes(include=['number']).columns
|
| if len(numeric_cols) > 0:
|
| metadata += f"Numeric columns: {list(numeric_cols)}\n"
|
| for col in numeric_cols:
|
| metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}, sum={df[col].sum()}\n"
|
|
|
| metadata += "\nSAMPLE DATA (first 10 rows):\n"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| return metadata
|
|
|
| except Exception as e:
|
| return f"Error reading Excel file: {str(e)}"
|
|
|
|
|
| import os
|
| import mimetypes
|
| from pathlib import Path
|
|
|
| def image_file_info(image_path: str, question: str) -> str:
|
| """Enhanced image file analysis using Gemini API"""
|
| try:
|
|
|
| if not os.path.exists(image_path):
|
| return f"Error: Image file not found at {image_path}"
|
|
|
|
|
| try:
|
| import google.generativeai as genai
|
| from PIL import Image
|
|
|
|
|
| genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
|
|
|
|
| model = genai.GenerativeModel('gemini-1.5-flash')
|
|
|
|
|
| try:
|
| image = Image.open(image_path)
|
|
|
| if image.mode in ('RGBA', 'LA'):
|
| background = Image.new('RGB', image.size, (255, 255, 255))
|
| if image.mode == 'RGBA':
|
| background.paste(image, mask=image.split()[-1])
|
| else:
|
| background.paste(image, mask=image.split()[-1])
|
| image = background
|
| elif image.mode != 'RGB':
|
| image = image.convert('RGB')
|
|
|
| except Exception as img_error:
|
| return f"Error opening image: {img_error}"
|
|
|
|
|
| response = model.generate_content([question, image])
|
|
|
| return response.text
|
|
|
| except ImportError:
|
|
|
| try:
|
| from google import genai
|
| from google.genai import types
|
|
|
|
|
| client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
|
|
|
|
|
| with open(image_path, "rb") as f:
|
| img_bytes = f.read()
|
|
|
|
|
| mime_type, _ = mimetypes.guess_type(image_path)
|
| if mime_type is None or not mime_type.startswith('image/'):
|
|
|
| if image_path.lower().endswith('.png'):
|
| mime_type = "image/png"
|
| else:
|
| mime_type = "image/jpeg"
|
|
|
|
|
| response = client.models.generate_content(
|
| model="gemini-1.5-flash",
|
| contents=[
|
| question,
|
| types.Part.from_bytes(data=img_bytes, mime_type=mime_type)
|
| ],
|
| )
|
|
|
| return response.text
|
|
|
| except Exception as new_sdk_error:
|
| return f"Error with both SDKs. New SDK error: {new_sdk_error}"
|
|
|
| except Exception as e:
|
| return f"Error during image analysis: {e}"
|
|
|
| def audio_file_info(audio_path: str) -> str:
|
| """Returns only the transcription of an audio file."""
|
| try:
|
| model = whisper.load_model("tiny")
|
| result = model.transcribe(audio_path, fp16=False)
|
| return result['text']
|
| except Exception as e:
|
| return f"Error transcribing audio: {str(e)}"
|
|
|
| def code_file_read(code_path: str) -> str:
|
| """Enhanced code file analysis"""
|
| try:
|
| with open(code_path, "r", encoding="utf-8") as f:
|
| content = f.read()
|
|
|
| file_path = Path(code_path)
|
|
|
| info = f"CODE FILE ANALYSIS:\n"
|
| info += f"File: {file_path.name}\n"
|
| info += f"Extension: {file_path.suffix}\n"
|
| info += f"Size: {len(content)} characters, {len(content.splitlines())} lines\n"
|
|
|
|
|
| if file_path.suffix == '.py':
|
|
|
| import_lines = [line for line in content.splitlines() if line.strip().startswith(('import ', 'from '))]
|
| if import_lines:
|
| info += f"Imports ({len(import_lines)}): {', '.join(import_lines[:5])}\n"
|
|
|
|
|
| func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE))
|
| class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE))
|
| info += f"Functions: {func_count}, Classes: {class_count}\n"
|
|
|
| info += f"\nCODE CONTENT:\n{content}"
|
| return info
|
|
|
| except Exception as e:
|
| return f"Error reading code file: {e}"
|
|
|
|
|
| import yt_dlp
|
| from pathlib import Path
|
|
|
| def extract_youtube_info(question: str) -> str:
|
| """
|
| Download a YouTube video or audio using yt-dlp without merging.
|
|
|
| Parameters:
|
| - url: str — YouTube URL
|
| - audio_only: bool — if True, downloads audio only; else best single video+audio stream
|
|
|
| Returns:
|
| - str: path to downloaded file or error message
|
| """
|
| pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))"
|
| match = re.search(pattern, question)
|
| youtube_url = match.group(1) if match else None
|
|
|
| print(f"Extracting YouTube URL: {youtube_url}")
|
| try:
|
|
|
| video_id = re.search(r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', youtube_url).group(1)
|
|
|
|
|
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
|
|
|
|
|
| full_transcript = ' '.join([entry['text'] for entry in transcript_list])
|
|
|
|
|
| full_transcript = re.sub(r'\s+', ' ', full_transcript).strip()
|
|
|
| return full_transcript
|
|
|
| except Exception as e:
|
| print(f"Error getting transcript: {e}")
|
| return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|