| from langchain_core.tools import tool |
| import os |
| from dotenv import load_dotenv |
| from langchain_community.tools import DuckDuckGoSearchRun |
| from langchain_experimental.tools import PythonREPLTool |
| from langchain_core.messages import HumanMessage |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| import requests |
| import base64 |
| import tempfile |
| import pypdf |
| import pandas |
| import zipfile |
| from pathlib import Path |
| import mimetypes |
| from typing import Optional |
| import whisper |
| import torch |
| import yt_dlp |
| import google.generativeai as genai |
| import time |
|
|
| load_dotenv() |
|
|
| vision_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) |
| |
| |
|
|
|
|
| @tool |
| def google_grounding_search(query: str) -> str: |
| """ |
| Search for current information using Google's grounded search. |
| |
| Use this tool when you need: |
| - Latest/current information (news, events, prices, etc.) |
| - Real-time data that might not be in your training |
| - Recent developments or updates |
| - Current facts to supplement your knowledge |
| |
| Args: |
| query: Search query (be specific and focused) |
| |
| Returns: |
| Current information from Google search with citations |
| |
| Example usage: |
| - google_grounding_search("latest AI news January 2025") |
| - google_grounding_search("current Tesla stock price") |
| - google_grounding_search("Manchester United new signings 2025") |
| """ |
| try: |
| |
| from google import genai |
| from google.genai import types |
| import os |
| |
| |
| api_key = os.getenv("GEMINI_API_KEY") |
| if not api_key: |
| return "Error: GEMINI_API_KEY not found in environment variables" |
| |
| |
| client = genai.Client(api_key=api_key) |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) |
| |
| |
| grounding_config = types.GenerateContentConfig( |
| tools=[grounding_tool] |
| ) |
| |
| |
| |
| |
| response = client.models.generate_content( |
| model="gemini-2.0-flash", |
| contents=f"Search for and provide current information about: {query}", |
| config=grounding_config |
| ) |
| |
| result = response.text.strip() |
| |
| if not result: |
| return "No results found from grounded search" |
| |
| return f"Current Information (via Google Search):\n{result}" |
| |
| except ImportError as e: |
| return f"Error: google-genai library not available. Import error: {str(e)}" |
| except Exception as e: |
| return f"Error performing grounded search: {str(e)}" |
| |
| @tool |
| def execute_python(code: str) -> str: |
| """Execute Python code for mathematical calculations, data analysis, and general computation. |
| |
| Args: |
| code: Valid Python code to execute |
| |
| Returns: |
| The output/result of the executed code |
| """ |
| try: |
| |
| if all(char in "0123456789+-*/.() " for char in code.strip()): |
| result = eval(code) |
| return str(result) |
| |
| |
| import io |
| import sys |
| from contextlib import redirect_stdout |
| |
| |
| captured_output = io.StringIO() |
| local_vars = {} |
| |
| with redirect_stdout(captured_output): |
| exec(code, {"__builtins__": __builtins__}, local_vars) |
| |
| output = captured_output.getvalue().strip() |
| |
| |
| if not output and local_vars: |
| |
| last_var = list(local_vars.values())[-1] if local_vars else None |
| if last_var is not None: |
| return str(last_var) |
| |
| return output if output else "Code executed successfully (no output)" |
| |
| except Exception as e: |
| return f"Error executing code: {str(e)}" |
| |
| @tool |
| def download_files_from_api(task_id: str, file_extension: str = None) -> str: |
| """Downloads a file (image, PDF, CSV, code, audio, Excel, etc.) associated with a task ID from the API. |
| The file is saved to a temporary location, and its local path is returned. |
| |
| Args: |
| task_id: The task ID for which to download the file. |
| file_extension: Optional. The expected file extension (e.g., ".py", ".csv", ".pdf"). |
| If provided, this will be used for the temporary file. |
| Otherwise, the extension will be inferred from the Content-Type header. |
| |
| Returns: |
| The absolute path to the downloaded file, or an error message. |
| """ |
| try: |
| api_url = "https://agents-course-unit4-scoring.hf.space" |
| response = requests.get(f"{api_url}/files/{task_id}", timeout=30) |
| response.raise_for_status() |
| |
| ext = file_extension |
| if not ext: |
| |
| content_type = response.headers.get('Content-Type', '') |
| if 'image/jpeg' in content_type: |
| ext = '.jpg' |
| elif 'image/png' in content_type: |
| ext = '.png' |
| elif 'application/pdf' in content_type: |
| ext = '.pdf' |
| elif 'text/csv' in content_type: |
| ext = '.csv' |
| elif 'text/x-python' in content_type or 'application/x-python-code' in content_type: |
| ext = '.py' |
| elif 'audio/mpeg' in content_type: |
| ext = '.mp3' |
| elif 'audio/wav' in content_type: |
| ext = '.wav' |
| elif 'application/vnd.ms-excel' in content_type: |
| ext = '.xls' |
| elif 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in content_type: |
| ext = '.xlsx' |
| elif 'application/zip' in content_type: |
| ext = '.zip' |
| elif 'text/plain' in content_type: |
| ext = '.txt' |
| else: |
| ext = '.bin' |
| |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: |
| temp_file.write(response.content) |
| file_path = temp_file.name |
| |
| print(f"Downloaded file for task {task_id} to: {file_path}") |
| return file_path |
| |
| except requests.exceptions.RequestException as e: |
| return f"Error downloading file from API: {str(e)}" |
| except Exception as e: |
| return f"An unexpected error occurred: {str(e)}" |
|
|
| @tool |
| def process_image(image_path: str) -> str: |
| """Analyze an image file from a local path - extract any text present and provide visual description. |
| This tool can handle various image formats like PNG, JPEG, GIF, etc. |
| |
| Args: |
| image_path: The absolute path to the local image file. |
| |
| Returns: |
| Extracted text (if any) and visual description of the image. |
| """ |
| try: |
| |
| import mimetypes |
| mime_type, _ = mimetypes.guess_type(image_path) |
| if mime_type is None: |
| |
| mime_type = "application/octet-stream" |
|
|
| with open(image_path, "rb") as image_file: |
| image_bytes = image_file.read() |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
| |
| |
| text_message = [ |
| HumanMessage( |
| content=[ |
| { |
| "type": "text", |
| "text": ( |
| "Extract all the text from this image. " |
| "Return only the extracted text, no explanations. " |
| "If no text is found, respond with 'No text found'." |
| ), |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:{mime_type};base64,{image_base64}" |
| }, |
| }, |
| ] |
| ) |
| ] |
| |
| text_response = vision_llm.invoke(text_message) |
| extracted_text = text_response.content.strip() |
| |
| |
| description_message = [ |
| HumanMessage( |
| content=[ |
| { |
| "type": "text", |
| "text": ( |
| "Describe what you see in this image in detail. " |
| "Be specific about objects, positions, colors, text, numbers, " |
| "and any other relevant visual information." |
| ), |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:{mime_type};base64,{image_base64}" |
| }, |
| }, |
| ] |
| ) |
| ] |
| |
| description_response = vision_llm.invoke(description_message) |
| description = description_response.content.strip() |
| |
| |
| result = f"TEXT EXTRACTED:\n{extracted_text}\n\nVISUAL DESCRIPTION:\n{description}" |
| |
| return result |
| |
| except FileNotFoundError: |
| return f"Error: Image file not found at {image_path}" |
| except Exception as e: |
| return f"Error processing image: {str(e)}" |
|
|
| @tool |
| def process_pdf(pdf_path: str) -> str: |
| """Extracts all text content from a PDF file. |
| Args: |
| pdf_path: The absolute path to the local PDF file. |
| Returns: |
| A string containing all extracted text from the PDF, or an error message. |
| """ |
| try: |
| reader = pypdf.PdfReader(pdf_path) |
| text = "" |
| for page in reader.pages: |
| text += page.extract_text() + "\n" |
| return text if text else "No text found in PDF." |
| except FileNotFoundError: |
| return f"Error: PDF file not found at {pdf_path}" |
| except Exception as e: |
| return f"Error processing PDF: {str(e)}" |
|
|
| @tool |
| def process_csv(csv_path: str, operation: str = "summary", params: dict = None) -> str: |
| """Processes a CSV file based on the specified operation. |
| Args: |
| csv_path: The absolute path to the local CSV file. |
| operation: The operation to perform. Supported operations: |
| "summary": Returns a summary of the CSV (head, columns, dtypes, shape). |
| "get_column": Returns the content of a specific column. Requires 'column_name' in params. |
| "filter": Filters rows based on a condition. Requires 'column', 'operator', 'value' in params. |
| Supported operators: "==", "!=", ">", "<", ">=", "<=". |
| "aggregate": Performs aggregation on a column. Requires 'agg_column', 'agg_function' in params. |
| Optional: 'group_by_column'. Supported functions: "sum", "mean", "count", "min", "max". |
| "describe": Returns descriptive statistics for numerical columns. |
| params: A dictionary of parameters for the chosen operation. |
| Returns: |
| A string containing the result of the operation, or an error message. |
| """ |
| if params is None: |
| params = {} |
|
|
| try: |
| df = pandas.read_csv(csv_path) |
|
|
| if operation == "summary": |
| summary = f"Shape: {df.shape}\n" |
| summary += f"Columns:\n{df.columns.tolist()}\n" |
| summary += f"Data Types:\n{df.dtypes}\n" |
| summary += f"First 5 rows:\n{df.head().to_string()}" |
| return summary |
|
|
| elif operation == "get_column": |
| column_name = params.get("column_name") |
| if column_name not in df.columns: |
| return f"Error: Column '{column_name}' not found." |
| return df[column_name].to_string() |
|
|
| elif operation == "filter": |
| column = params.get("column") |
| operator = params.get("operator") |
| value = params.get("value") |
|
|
| if not all([column, operator, value is not None]): |
| return "Error: 'column', 'operator', and 'value' are required for filter operation." |
| if column not in df.columns: |
| return f"Error: Column '{column}' not found." |
|
|
| if operator == "==": |
| filtered_df = df[df[column] == value] |
| elif operator == "!=": |
| filtered_df = df[df[column] != value] |
| elif operator == ">": |
| filtered_df = df[df[column] > value] |
| elif operator == "<": |
| filtered_df = df[df[column] < value] |
| elif operator == ">=": |
| filtered_df = df[df[column] >= value] |
| elif operator == "<=": |
| filtered_df = df[df[column] <= value] |
| else: |
| return f"Error: Unsupported operator '{operator}'." |
| return filtered_df.to_string() |
|
|
| elif operation == "aggregate": |
| agg_column = params.get("agg_column") |
| agg_function = params.get("agg_function") |
| group_by_column = params.get("group_by_column") |
|
|
| if not all([agg_column, agg_function]): |
| return "Error: 'agg_column' and 'agg_function' are required for aggregate operation." |
| if agg_column not in df.columns: |
| return f"Error: Column '{agg_column}' not found." |
| if group_by_column and group_by_column not in df.columns: |
| return f"Error: Group by column '{group_by_column}' not found." |
|
|
| if agg_function not in ["sum", "mean", "count", "min", "max"]: |
| return f"Error: Unsupported aggregation function '{agg_function}'." |
|
|
| if group_by_column: |
| result = df.groupby(group_by_column)[agg_column].agg(agg_function) |
| else: |
| result = df[agg_column].agg(agg_function) |
| return str(result) |
|
|
| elif operation == "describe": |
| return df.describe().to_string() |
|
|
| else: |
| return f"Error: Unsupported operation '{operation}'." |
|
|
| except FileNotFoundError: |
| return f"Error: CSV file not found at {csv_path}" |
| except Exception as e: |
| return f"Error processing CSV: {str(e)}" |
|
|
| @tool |
| def process_code_file(code_file_path: str) -> str: |
| """Reads and executes a code file, returning its output along with the full code. |
| Args: |
| code_file_path: The absolute path to the local code file. |
| Returns: |
| A string containing the full code and the output of the executed code, or an error message. |
| """ |
| try: |
| with open(code_file_path, "r") as f: |
| code_content = f.read() |
|
|
| if code_file_path.endswith(".py"): |
| execution_output = execute_python(code_content) |
| return f"--- FULL CODE ---\n{code_content}--- EXECUTION OUTPUT ---\n{execution_output}" |
| else: |
| return f"Error: Only Python (.py) files are supported for execution. Found: {code_file_path}" |
|
|
| except FileNotFoundError: |
| return f"Error: Code file not found at {code_file_path}" |
| except Exception as e: |
| return f"Error processing code file: {str(e)}" |
|
|
| @tool |
| def process_excel(excel_path: str, operation: str = "summary", params: dict = None) -> str: |
| """Processes an Excel file based on the specified operation. |
| Args: |
| excel_path: The absolute path to the local Excel file. |
| operation: The operation to perform. Supported operations: |
| "summary": Returns a summary of the Excel file (sheet names, columns, etc.). |
| "get_sheet": Returns the content of a specific sheet. Requires 'sheet_name' in params. |
| |
| Returns: |
| A string containing the result of the operation, or an error message. |
| """ |
| if params is None: |
| params = {} |
|
|
| try: |
| xls = pandas.ExcelFile(excel_path) |
|
|
| if operation == "summary": |
| sheet_names = xls.sheet_names |
| summary = f"Sheets: {sheet_names}\n" |
| for sheet in sheet_names: |
| df = pandas.read_excel(xls, sheet_name=sheet) |
| summary += f"\n--- Sheet: {sheet} ---\n" |
| summary += f"Shape: {df.shape}\n" |
| summary += f"Columns: {df.columns.tolist()}\n" |
| summary += f"First 5 rows:\n{df.head().to_string()}\n" |
| return summary |
|
|
| elif operation == "get_sheet": |
| sheet_name = params.get("sheet_name") |
| if sheet_name not in xls.sheet_names: |
| return f"Error: Sheet '{sheet_name}' not found." |
| df = pandas.read_excel(xls, sheet_name=sheet_name) |
| return df.to_string() |
|
|
| else: |
| return f"Error: Unsupported operation '{operation}'." |
|
|
| except FileNotFoundError: |
| return f"Error: Excel file not found at {excel_path}" |
| except Exception as e: |
| return f"Error processing Excel file: {str(e)}" |
|
|
| @tool |
| def process_archive(archive_path: str, operation: str = "list", extract_to: str = None) -> str: |
| """Processes a .zip archive file. |
| |
| Args: |
| archive_path: The absolute path to the local .zip file. |
| operation: The operation to perform. Supported operations: |
| "list": Lists the contents of the archive. |
| "extract": Extracts the entire archive. Requires 'extract_to' parameter. |
| extract_to: Optional. The directory to extract the files to. |
| If not provided, it will create a directory with the same name as the archive. |
| |
| Returns: |
| A string containing the result of the operation, or an error message. |
| """ |
| try: |
| if not zipfile.is_zipfile(archive_path): |
| return f"Error: File at {archive_path} is not a valid .zip file." |
| |
| with zipfile.ZipFile(archive_path, 'r') as zip_ref: |
| if operation == "list": |
| file_list = zip_ref.namelist() |
| return f"Files in archive: {file_list}" |
| |
| elif operation == "extract": |
| if extract_to is None: |
| |
| extract_to, _ = os.path.splitext(archive_path) |
| |
| os.makedirs(extract_to, exist_ok=True) |
| zip_ref.extractall(extract_to) |
| return f"Archive extracted successfully to: {extract_to}" |
| |
| else: |
| return f"Error: Unsupported operation '{operation}'." |
| |
| except FileNotFoundError: |
| return f"Error: Archive file not found at {archive_path}" |
| except Exception as e: |
| return f"Error processing archive: {str(e)}" |
|
|
| @tool |
| def read_text_file(file_path: str) -> str: |
| """Reads the entire content of a text file. |
| Args: |
| file_path: The absolute path to the local text file (.txt, .md, .json, etc.). |
| Returns: |
| A string containing the full content of the file, or an error message. |
| """ |
| try: |
| with open(file_path, "r", encoding='utf-8') as f: |
| content = f.read() |
| return content |
| except FileNotFoundError: |
| return f"Error: File not found at {file_path}" |
| except Exception as e: |
| return f"Error reading text file: {str(e)}" |
| |
|
|
| |
| _whisper_model = None |
|
|
| @tool |
| def process_audio(audio_path: str) -> str: |
| """Analyzes an audio file using local Whisper model for transcription. |
| |
| Args: |
| audio_path: The absolute path to the local audio file |
| |
| Returns: |
| A transcription and basic analysis of the audio content |
| """ |
| global _whisper_model |
| |
| try: |
| |
| if not os.path.exists(audio_path): |
| return f"Error: Audio file not found at {audio_path}" |
| |
| |
| file_size = os.path.getsize(audio_path) |
| if file_size > 100 * 1024 * 1024: |
| return f"Error: Audio file too large ({file_size / (1024*1024):.1f}MB)" |
| |
| |
| if _whisper_model is None: |
| try: |
| _whisper_model = whisper.load_model("base") |
| print("Whisper model loaded") |
| except Exception as e: |
| return f"Error loading Whisper model: {str(e)}\nTry: pip install openai-whisper" |
| |
| |
| result = _whisper_model.transcribe(audio_path) |
| transcription = result["text"].strip() |
| detected_language = result.get("language", "unknown") |
| |
| |
| word_count = len(transcription.split()) |
| |
| return f"""AUDIO TRANSCRIPTION: |
| File: {Path(audio_path).name} |
| Size: {file_size / (1024*1024):.1f}MB |
| Language: {detected_language} |
| Words: {word_count} |
| TRANSCRIPT: |
| {transcription} |
| """ |
| |
| except Exception as e: |
| return f"Error processing audio: {str(e)}" |
| |
| @tool |
| def process_youtube_video(url: str, question: str) -> str: |
| """ |
| REQUIRED for YouTube video analysis. Downloads and analyzes YouTube videos |
| to answer questions about visual content, count objects, identify details. |
| |
| Use this tool WHENEVER you see a YouTube URL in the question. |
| This is the ONLY way to analyze YouTube video content accurately. |
| |
| Args: |
| url: YouTube video URL (any youtube.com or youtu.be link) |
| question: The specific question about the video content |
| |
| Returns: |
| Detailed analysis of the actual video content |
| """ |
| try: |
| |
| import google.generativeai as genai |
| genai.configure(api_key=os.environ.get("GOOGLE_API_KEY")) |
| |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| temp_path = Path(temp_dir) |
| |
| |
| ydl_opts = { |
| 'format': 'best[height<=720]', |
| 'outtmpl': str(temp_path / '%(title)s.%(ext)s'), |
| 'quiet': True, |
| 'no_warnings': True, |
| } |
| |
| print(f"Downloading video from: {url}") |
| |
| |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(url, download=True) |
| video_title = info.get('title', 'Unknown') |
| duration = info.get('duration', 0) |
| |
| |
| video_files = list(temp_path.glob('*')) |
| if not video_files: |
| return "Error: Failed to download video file" |
| |
| video_file = video_files[0] |
| file_size = video_file.stat().st_size / (1024 * 1024) |
| |
| print(f"Video downloaded: {video_title} ({duration}s, {file_size:.1f}MB)") |
| |
| |
| if file_size > 100: |
| return f"Error: Video too large ({file_size:.1f}MB). Maximum size is 100MB." |
| |
| |
| try: |
| |
| print("Uploading video to Gemini...") |
| video_file_obj = genai.upload_file(str(video_file)) |
| |
| |
| while video_file_obj.state.name == "PROCESSING": |
| print("Processing video...") |
| time.sleep(2) |
| video_file_obj = genai.get_file(video_file_obj.name) |
| |
| if video_file_obj.state.name == "FAILED": |
| return "Error: Video processing failed" |
| |
| |
| analysis_prompt = f"""Analyze this video carefully to answer the following question: {question} |
| Please examine the video content thoroughly and provide a detailed, accurate answer. Pay attention to visual details, timing, and any relevant information that helps answer the question. |
| Video title: {video_title} |
| Duration: {duration} seconds |
| Question: {question}""" |
| |
| |
| model = genai.GenerativeModel('gemini-2.0-flash') |
| response = model.generate_content([analysis_prompt, video_file_obj]) |
| |
| |
| try: |
| genai.delete_file(video_file_obj.name) |
| except: |
| pass |
| |
| return f"""VIDEO ANALYSIS: |
| Title: {video_title} |
| URL: {url} |
| Duration: {duration} seconds |
| Size: {file_size:.1f}MB |
| QUESTION: {question} |
| ANSWER: {response.text}""" |
|
|
| except Exception as processing_error: |
| return f"Error processing video with Gemini: {str(processing_error)}" |
| |
| except ImportError: |
| return "Error: google-generativeai library not installed. Run: pip install google-generativeai" |
| except Exception as e: |
| return f"Error downloading or processing video: {str(e)}" |