# -*- coding: utf-8 -*- """ GAIA Benchmark Agent using LangChain, Groq, Tavily, and various tools. """ # --- Core Libraries --- import os import sys import subprocess import time import importlib from pathlib import Path from typing import List, Optional, Dict, Any # --- Environment & Configuration --- from dotenv import load_dotenv # --- LangChain Imports --- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import BaseTool, tool # Using Pydantic v2 is recommended if your environment supports it fully # from pydantic import BaseModel, Field # Pydantic v2 from pydantic import BaseModel, Field # Pydantic v1 compatibility shim from langchain.memory import ConversationBufferWindowMemory from langchain.agents import AgentExecutor, create_openai_tools_agent # Keep OpenAI Tools Agent # --- Tool Specific Imports --- # Search from langchain_community.tools.tavily_search import TavilySearchResults # Web Scraping import requests from bs4 import BeautifulSoup # LLM from langchain_groq import ChatGroq # Audio/Video Transcription (Optional) try: import openai; OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False # Excel Reading (Optional) try: import pandas as pd; PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False # YouTube Processing (Optional) try: from pytube import YouTube, PytubeError; PYTUBE_AVAILABLE = True except ImportError: PYTUBE_AVAILABLE = False # ============================================================================== # 1. CONFIGURATION # ============================================================================== load_dotenv() AGENT_WORKSPACE = Path("./gaia_agent_workspace"); AGENT_WORKSPACE.mkdir(exist_ok=True) MAX_ITERATIONS = 15; MEMORY_WINDOW_SIZE = 10 GROQ_API_KEY = os.getenv("GROQ_API_KEY"); GROQ_MODEL_NAME = os.getenv("GROQ_MODEL_NAME", "meta-llama/llama-4-maverick-17b-128e-instruct") TAVILY_API_KEY = os.getenv("TAVILY_API_KEY"); TAVILY_MAX_RESULTS = 3 OPENAI_API_KEY = os.getenv("OPENAI_API_KEY"); WHISPER_MODEL = "whisper-1" if not GROQ_API_KEY: print("ERROR: GROQ_API_KEY not set."); sys.exit(1) if not TAVILY_API_KEY: print("Warning: TAVILY_API_KEY not set.") openai_client = None if OPENAI_AVAILABLE and OPENAI_API_KEY: try: openai_client = openai.OpenAI(api_key=OPENAI_API_KEY); print("OpenAI client initialized.") except Exception as e: print(f"Warning: OpenAI client init failed: {e}"); openai_client = None if not PANDAS_AVAILABLE: print("Info: 'pandas' not installed. Excel tool disabled.") if not PYTUBE_AVAILABLE: print("Info: 'pytube' not installed. YouTube tool disabled.") # ============================================================================== # 2. TOOL DEFINITIONS # ============================================================================== # --- Tool Input Schemas (Pydantic Models) --- class FileWriteArgs(BaseModel): relative_path: str = Field(description="Relative path within the agent's workspace where the file should be written.") content: str = Field(description="The text content to write into the file.") class FileReadArgs(BaseModel): relative_path: str = Field(description="Relative path within the agent's workspace of the file to read.") class ListDirectoryArgs(BaseModel): relative_path: str = Field(default=".", description="Relative path within the agent's workspace to list contents of. Use '.' for the root.") class RunPythonCodeArgs(BaseModel): code: str = Field(description="The Python code to execute. Use 'print()' to output results. Code runs in isolation.") class WebScrapeArgs(BaseModel): url: str = Field(description="The URL of the webpage to scrape.") query: Optional[str] = Field(default=None, description="Optional specific question to answer from the page content.") class ReadExcelArgs(BaseModel): relative_path: str = Field(description="Relative path within the agent's workspace of the Excel file (.xlsx or .xls).") sheet_name: Optional[str] = Field(default=None, description="Optional name of the specific sheet to read. Reads the first sheet if not specified.") max_rows_preview: int = Field(default=20, description="Maximum number of rows to include in the text preview.") class TranscribeAudioArgs(BaseModel): relative_path: str = Field(description="Relative path within the agent's workspace of the audio file (e.g., .mp3, .wav, .m4a). Max 25MB.") class TranscribeYouTubeArgs(BaseModel): youtube_url: str = Field(description="The URL of the YouTube video to transcribe. Audio will be downloaded temporarily.") # --- Helper Functions --- def _resolve_path(relative_path: str) -> Optional[Path]: """Resolves a relative path against the workspace and checks bounds.""" try: normalized_relative_path = os.path.normpath(relative_path) # Prevent absolute paths or paths trying to escape the workspace if os.path.isabs(normalized_relative_path) or ".." in normalized_relative_path.split(os.sep): print(f"Error: Invalid path characters or attempt to escape workspace in '{relative_path}'.") return None full_path = (AGENT_WORKSPACE / normalized_relative_path).resolve() if AGENT_WORKSPACE.resolve() in full_path.parents or full_path == AGENT_WORKSPACE.resolve(): return full_path # Check prefix as a fallback, although resolve should handle canonical paths if str(full_path).startswith(str(AGENT_WORKSPACE.resolve())): print(f"Warning: Path resolution for '{relative_path}' seems complex but within workspace: {full_path}") return full_path print(f"Error: Path '{relative_path}' resolved to '{full_path}' which is outside the allowed workspace '{AGENT_WORKSPACE.resolve()}'.") return None except Exception as e: print(f"Error resolving path '{relative_path}': {e}") return None def _transcribe_audio(file_path: Path, file_description: str) -> str: """Helper to transcribe an audio file using OpenAI Whisper.""" if not openai_client: return "Error: OpenAI client not available for transcription." if not file_path.is_file(): try: rel_path_str = file_path.relative_to(AGENT_WORKSPACE) except ValueError: rel_path_str = file_path return f"Error: Audio file not found at '{rel_path_str}'" try: file_size_mb = file_path.stat().st_size / (1024 * 1024) if file_size_mb > 25: return f"Error: Audio file '{file_description}' is too large ({file_size_mb:.2f} MB). Max 25 MB." print(f"Transcribing audio: {file_description}...") with open(file_path, "rb") as audio_file_handle: transcript = openai_client.audio.transcriptions.create(model=WHISPER_MODEL, file=audio_file_handle, response_format="text") print("Transcription complete.") if isinstance(transcript, str): max_len = 10000; transcript = transcript[:max_len] + ("\n... [Transcription truncated]" if len(transcript) > max_len else ""); return f"Transcription of '{file_description}':\n{transcript}" else: return f"Transcription of '{file_description}' succeeded, but format was unexpected: {type(transcript)}" except openai.APIError as e: return f"OpenAI API Error during transcription of '{file_description}': {e}" except Exception as e: return f"Error transcribing '{file_description}': {e}" # --- Tool Implementations --- @tool("write_file", args_schema=FileWriteArgs) def write_file(relative_path: str, content: str) -> str: """Writes text content to a file within the agent's workspace. Creates parent directories if needed.""" full_path = _resolve_path(relative_path); if not full_path: return f"Error: Invalid or disallowed path '{relative_path}'." try: full_path.parent.mkdir(parents=True, exist_ok=True); open(full_path, 'w', encoding='utf-8').write(content); return f"Successfully wrote to file: {relative_path}" except Exception as e: return f"Error writing file '{relative_path}': {e}" @tool("read_file", args_schema=FileReadArgs) def read_file(relative_path: str) -> str: """Reads the text content of a file from the agent's workspace. Limited read size.""" full_path = _resolve_path(relative_path); if not full_path: return f"Error: Invalid or disallowed path '{relative_path}'." if not full_path.is_file(): return f"Error: File not found at '{relative_path}'" try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read(10000); content += "\n... [File truncated due to length]" if len(f.read(1)) > 0 else "" return content except Exception as e: return f"Error reading file '{relative_path}': {e}" @tool("list_directory", args_schema=ListDirectoryArgs) def list_directory(relative_path: str = ".") -> str: """Lists the contents (files and directories) of a specified directory within the agent's workspace.""" target_path = _resolve_path(relative_path); if not target_path: return f"Error: Invalid or disallowed path '{relative_path}'." if not target_path.is_dir(): return f"Error: '{relative_path}' is not a valid directory." try: items = [f.name + ('/' if f.is_dir() else '') for f in target_path.iterdir()]; items.sort(); return f"Contents of '{relative_path}':\n" + "\n".join(items) if items else f"Directory '{relative_path}' is empty." except Exception as e: return f"Error listing directory '{relative_path}': {e}" @tool("run_python_code", args_schema=RunPythonCodeArgs) def run_python_code(code: str) -> str: """Executes Python code in a subprocess and returns the stdout/stderr. Use print() for output. WARNING: Executes arbitrary code.""" print(f"Executing Python code:\n```python\n{code}\n```") try: process = subprocess.run([sys.executable, "-c", code], capture_output=True, text=True, timeout=30, cwd=AGENT_WORKSPACE, check=False) output, error = process.stdout, process.stderr result = "Execution successful.\n" if process.returncode == 0 else f"Execution failed (Return Code: {process.returncode}).\n" if output: max_output = 2000; output = output[:max_output] + ("\n... [Output truncated]" if len(output) > max_output else ""); result += f"Output:\n{output}\n" if error: max_error = 1000; error = error[:max_error] + ("\n... [Error truncated]" if len(error) > max_error else ""); result += f"Error Output:\n{error}\n" if not output and not error: result += "No output produced." if process.returncode == 0 else "No output or error message produced despite non-zero exit code." return result.strip() except subprocess.TimeoutExpired: return "Error: Code execution timed out after 30 seconds." except Exception as e: return f"Error executing Python code: {e}" @tool("scrape_webpage", args_schema=WebScrapeArgs) def scrape_webpage(url: str, query: Optional[str] = None) -> str: """Scrapes text content from a given URL using BeautifulSoup. If a query is provided, returns content for the agent to answer it.""" print(f"Attempting to scrape URL: {url}") try: space_id = os.getenv("SPACE_ID", "YOUR_SPACE_ID") headers = {'User-Agent': f'Mozilla/5.0 (compatible; GAIA-Agent/1.0; +https://huggingface.co/spaces/{space_id})'} response = requests.get(url, headers=headers, timeout=20); response.raise_for_status() content_type = response.headers.get('content-type', '').lower() if 'text/html' not in content_type: return f"Error: Content type of URL {url} is '{content_type}', not HTML. Cannot scrape." soup = BeautifulSoup(response.text, 'html.parser') for tag in soup(["script", "style", "nav", "footer", "aside", "header", "form", "button", "iframe", "noscript"]): tag.decompose() text_content = soup.get_text(separator='\n', strip=True); text_content = '\n'.join(line for line in text_content.splitlines() if line.strip()) if not text_content: return f"Could not extract meaningful text content from {url} after cleaning." max_chars = 10000; text_content = text_content[:max_chars] + ("\n... [Content truncated]" if len(text_content) > max_chars else "") print(f"Scraping successful for {url}. Content length (approx): {len(text_content)}") if query: return f"Use the following content from {url} to answer the query '{query}':\n\n{text_content}" else: return f"Content scraped from {url}:\n\n{text_content}" except requests.exceptions.Timeout: return f"Error: Timeout occurred while trying to fetch URL {url}" except requests.exceptions.RequestException as e: return f"Error fetching or reading URL {url}: {e}" except Exception as e: return f"Error scraping URL {url}: {e}" if PANDAS_AVAILABLE: @tool("read_excel_file", args_schema=ReadExcelArgs) def read_excel_file(relative_path: str, sheet_name: Optional[str] = None, max_rows_preview: int = 20) -> str: """Reads data from an Excel file (.xlsx or .xls) within the workspace and returns a text preview.""" full_path = _resolve_path(relative_path); if not full_path: return f"Error: Invalid or disallowed path '{relative_path}'." if not full_path.is_file(): return f"Error: Excel file not found at '{relative_path}'" print(f"Reading Excel file: {relative_path}") try: excel_file = pd.ExcelFile(full_path) if not excel_file.sheet_names: return f"Error: Excel file '{relative_path}' contains no sheets." sheet_to_read = sheet_name if sheet_name and sheet_name in excel_file.sheet_names else excel_file.sheet_names[0] if sheet_name and sheet_name not in excel_file.sheet_names: print(f"Warning: Sheet '{sheet_name}' not found, reading first sheet '{sheet_to_read}' instead.") print(f"Reading sheet '{sheet_to_read}' from {relative_path}") df = pd.read_excel(full_path, sheet_name=sheet_to_read) if df.empty: return f"Sheet '{sheet_to_read}' in '{relative_path}' is empty." output = f"Preview of sheet '{sheet_to_read}' from '{relative_path}' ({df.shape[0]} rows, {df.shape[1]} columns):\n" output += df.to_string(max_rows=max_rows_preview, max_cols=15, line_width=120) max_output_len = 5000; output = output[:max_output_len] + ("\n... [Output truncated due to length]" if len(output) > max_output_len else "") return output except Exception as e: return f"Error reading Excel file '{relative_path}': {e}" if OPENAI_AVAILABLE and openai_client: @tool("transcribe_audio_file", args_schema=TranscribeAudioArgs) def transcribe_audio_file(relative_path: str) -> str: """Transcribes audio content from a file in the workspace using OpenAI Whisper (max 25MB).""" full_path = _resolve_path(relative_path); if not full_path: return f"Error: Invalid or disallowed path '{relative_path}'." return _transcribe_audio(full_path, relative_path) if PYTUBE_AVAILABLE and OPENAI_AVAILABLE and openai_client: @tool("transcribe_youtube_video", args_schema=TranscribeYouTubeArgs) def transcribe_youtube_video(youtube_url: str) -> str: """Downloads audio from a YouTube URL, transcribes it using OpenAI Whisper, and returns the text.""" temp_audio_path = None try: print(f"Processing YouTube URL: {youtube_url}"); yt = YouTube(youtube_url, use_oauth=False, allow_oauth_cache=False) print("Fetching available streams...") audio_stream = yt.streams.filter(only_audio=True, subtype='webm').order_by('abr').desc().first() or \ yt.streams.filter(only_audio=True, subtype='mp4').order_by('abr').desc().first() or \ yt.streams.get_audio_only() if not audio_stream: return f"Error: No suitable audio stream found for YouTube video: {youtube_url}" print(f"Selected audio stream: Itag {audio_stream.itag}, ABR {audio_stream.abr}") try: video_id = yt.video_id except: video_id = f"vid_{int(time.time())}" temp_filename = f"temp_youtube_{video_id}.{audio_stream.subtype or 'mp4'}" temp_audio_path = AGENT_WORKSPACE / temp_filename print(f"Downloading audio to: {temp_audio_path}...") audio_stream.download(output_path=AGENT_WORKSPACE, filename=temp_filename); print("Download complete.") result = _transcribe_audio(temp_audio_path, f"YouTube video '{yt.title}'"); return result except PytubeError as e: return f"Error processing YouTube video {youtube_url} (PytubeError): {e}" except Exception as e: return f"Unexpected error during YouTube transcription {youtube_url}: {e}" finally: if temp_audio_path and temp_audio_path.exists(): try: temp_audio_path.unlink(); print(f"Cleaned up temporary file: {temp_audio_path}") except Exception as e: print(f"Warning: Failed to delete temp file {temp_audio_path}: {e}") # ============================================================================== # 3. AGENT SETUP # ============================================================================== # --- Initialize LLM --- try: llm = ChatGroq(temperature=0, model_name=GROQ_MODEL_NAME, groq_api_key=GROQ_API_KEY) print(f"Using Groq LLM: {GROQ_MODEL_NAME}") except Exception as e: print(f"FATAL: Error initializing Groq LLM: {e}"); sys.exit(1) # --- Assemble Available Tools --- available_tools = [] if TAVILY_API_KEY: try: available_tools.append(TavilySearchResults(max_results=TAVILY_MAX_RESULTS, api_key=TAVILY_API_KEY)) except Exception as e: print(f"Warning: Failed to initialize Tavily Search tool: {e}. Tool disabled.") else: print("Warning: Tavily Search tool disabled (API key missing).") available_tools.extend([write_file, read_file, list_directory, run_python_code, scrape_webpage]) if PANDAS_AVAILABLE: available_tools.append(read_excel_file) if OPENAI_AVAILABLE and openai_client: available_tools.append(transcribe_audio_file) if PYTUBE_AVAILABLE and OPENAI_AVAILABLE and openai_client: available_tools.append(transcribe_youtube_video) print(f"Agent initialized with tools: {[tool.name for tool in available_tools]}") # --- Define System Prompt --- # Contains {tools} and {agent_workspace} placeholders. SYSTEM_PROMPT_TEMPLATE = """You are a highly capable AI assistant designed to solve complex problems step-by-step, mimicking human-like reasoning and actions. Your goal is to accurately answer the user's request based on the GAIA benchmark philosophy. **Workspace:** You have access to a local workspace directory: '{agent_workspace}'. You can ONLY interact with files inside this directory using the provided tools. Always use relative paths for file operations. **Available Tools:** You have access to the following tools: {tools} **Reasoning Process:** 1. **Understand:** Analyze the request. Identify objectives, constraints, and required information (text, web search, file content, Excel data, audio/video transcription, calculations). 2. **Plan:** Break down the problem into logical steps. Choose the *most appropriate* tool for each step. 3. **Execute:** Perform actions step-by-step using ONE tool at a time. Provide valid arguments for the chosen tool. 4. **Observe:** Analyze the results (observations) from each tool execution. Note errors or unexpected output. 5. **Reflect & Adjust:** If a step fails or results are insufficient, analyze the error, refine your plan, and try a different approach or tool. If a file isn't found, consider using `list_directory`. If web search results aren't specific enough, refine your query. If scraping fails, the site might be dynamic or blocking; note this limitation. 6. **Synthesize:** Once all necessary information is gathered and actions performed, combine the findings to formulate the final answer. 7. **Final Answer:** Provide ONLY the final answer in the precise format requested by the task. Do not include explanations, commentary, or conversational text unless explicitly asked for. If the task requires creating a file, use `write_file` and state the relative path if needed as the final answer. **Important Guidelines:** * Think step-by-step. Be methodical. * Use file/audio/excel tools ONLY for the designated workspace: {agent_workspace}. Use relative paths. * Check file existence with `list_directory` before attempting to read if unsure. * Use `read_excel_file` for `.xlsx` or `.xls` files. * Use `transcribe_audio_file` for local audio files (e.g., .mp3, .wav). Max 25MB. * Use `transcribe_youtube_video` for YouTube URLs. Max 25MB audio download. * Use `run_python_code` for calculations or data manipulation not covered by other tools. Use `print()` for output. * Use `tavily_search_results_json` for web searches. Use `scrape_webpage` to get content from a specific URL found in search or given in the prompt. * Adhere strictly to the requested final answer format. """ # --- Create Prompt Template --- # Pre-format the system prompt string fully before creating the template try: # Format the tool descriptions manually using the render_text_description utility from langchain.tools.render import render_text_description tool_descriptions = render_text_description(available_tools) # Format the entire system prompt string formatted_system_prompt = SYSTEM_PROMPT_TEMPLATE.format( agent_workspace=str(AGENT_WORKSPACE.resolve()), tools=tool_descriptions ) # Create the template from the fully formatted string prompt = ChatPromptTemplate.from_messages( [ ("system", formatted_system_prompt), # Use the pre-formatted string MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), # Still needed by the agent type ] ) except Exception as e: print(f"FATAL: Error creating ChatPromptTemplate: {e}") sys.exit(1) # --- Setup Memory --- memory = ConversationBufferWindowMemory( k=MEMORY_WINDOW_SIZE, memory_key="chat_history", return_messages=True ) # --- Create Agent --- # Using create_openai_tools_agent try: agent = create_openai_tools_agent(llm, available_tools, prompt) except Exception as e: print(f"FATAL: Error creating agent with create_openai_tools_agent: {e}") import traceback traceback.print_exc() sys.exit(1) # --- Create Agent Executor --- try: agent_executor = AgentExecutor( agent=agent, tools=available_tools, memory=memory, verbose=True, max_iterations=MAX_ITERATIONS, handle_parsing_errors=True, ) except Exception as e: print(f"FATAL: Error creating AgentExecutor: {e}") sys.exit(1) # ============================================================================== # 4. EXECUTION FUNCTION (Exported for app.py) # ============================================================================== def run_gaia_task(task_description: str): """Runs the GAIA agent on a given task description. This is the main entry point.""" print("\n" + "="*50 + f"\nšŸš€ Running GAIA Task\nšŸ“ Task: {task_description[:150]}...\nšŸ“ Workspace: {AGENT_WORKSPACE.resolve()}\nšŸ› ļø Tools: {[tool.name for tool in available_tools]}\n" + "="*50 + "\n") memory.clear() # Reset memory for the task try: if 'agent_executor' not in globals() or agent_executor is None: return "Error: Agent Executor not initialized." result = agent_executor.invoke({"input": task_description}) final_output = result.get('output', 'Agent finished but produced no output.') print("\n" + "="*50 + f"\nāœ… Agent Execution Finished\nšŸ Final Output:\n{final_output}\n" + "="*50 + "\n") return str(final_output) except Exception as e: print(f"\n{'='*50}\nāŒ Agent Execution Error during task run\nAn error occurred: {e}\n{'='*50}\n") import traceback; traceback.print_exc() # Print full traceback for debugging return f"Agent failed with error: {e}" # ============================================================================== # 5. EXAMPLE USAGE (Local Testing) # ============================================================================== if __name__ == "__main__": print("\n" + "*"*30 + " LOCAL TEST RUN " + "*"*30) print("--- Setting up example files (if needed) ---") if PANDAS_AVAILABLE: try: dummy_excel_path = AGENT_WORKSPACE / "sample_data.xlsx" if not dummy_excel_path.exists(): pd.DataFrame({'ID': [1, 2, 3], 'Product': ['Widget', 'Gadget', 'Thingamajig']}).to_excel(dummy_excel_path, index=False); print(f"Created dummy Excel: {dummy_excel_path}") except Exception as e: print(f"Could not create dummy Excel: {e}") try: dummy_text_path = AGENT_WORKSPACE / "numbers.txt" if not dummy_text_path.exists(): with open(dummy_text_path, "w") as f: f.write("15\n-3\n42.5\n100\n"); print(f"Created dummy text file: {dummy_text_path}") except Exception as e: print(f"Could not create dummy text file: {e}") dummy_audio_path = AGENT_WORKSPACE / "sample_audio.mp3" if not dummy_audio_path.exists() and OPENAI_AVAILABLE and openai_client: print(f"INFO: To test audio transcription, place an MP3 file at: {dummy_audio_path}") print("--- Example setup complete ---") example_tasks = [ {"id": "local_excel_read", "description": "Read the file 'sample_data.xlsx' in the workspace. What is the 'Product' where 'ID' is 2? Final answer should be just the product name."}, {"id": "local_python_sum", "description": "Read the numbers from 'numbers.txt' in the workspace (one per line). Calculate their sum using python code. Write the sum into 'sum_result.txt'. Final answer should be the relative path 'sum_result.txt'."}, {"id": "local_search_scrape_write", "description": "Search the web for the official website of the Python Software Foundation. Scrape the main title from the homepage of that website. Write the title into 'psf_title.txt'. Final answer is 'psf_title.txt'."}, ] if example_tasks: task_to_run = example_tasks[0] # Change index to test different tasks print(f"\n>>> Running local test task: {task_to_run['id']} <<<") final_answer = run_gaia_task(task_to_run['description']) print(f">>> Local test task {task_to_run['id']} completed. Agent Output: {final_answer} <<<") else: print("No example tasks defined for local testing.") print("\n" + "*"*30 + " LOCAL TEST RUN COMPLETE " + "*"*30)