| import os
|
| import re
|
| from pathlib import Path
|
| from typing import Optional, Union, Dict, List, Any
|
| from enum import Enum
|
| import requests
|
| import tempfile
|
| import ast
|
|
|
| from dotenv import load_dotenv
|
| from langgraph.graph import StateGraph, END
|
| from langchain.tools import Tool as LangTool
|
| from langchain_core.runnables import RunnableLambda
|
| from langchain_google_genai import ChatGoogleGenerativeAI
|
| from pathlib import Path
|
|
|
| from langchain.tools import StructuredTool
|
| from langchain_openai import ChatOpenAI
|
| from langchain_groq import ChatGroq
|
|
|
| import pandas as pd
|
| df = pd.read_csv('C:\\Users\\AviralMittal\\OneDrive\\hf_course\\hf_ai_answers.csv')
|
| print(f'df read.....{len(df)}')
|
|
|
| from tools import (
|
| EnhancedSearchTool,
|
| EnhancedWikipediaTool,
|
| excel_to_markdown,
|
| image_file_info,
|
| audio_file_info,
|
| code_file_read,
|
| extract_youtube_info)
|
|
|
|
|
| load_dotenv()
|
|
|
|
|
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
|
| QUESTIONS_URL = f"{DEFAULT_API_URL}/questions"
|
| SUBMIT_URL = f"{DEFAULT_API_URL}/submit"
|
| FILE_PATH = f"{DEFAULT_API_URL}/files/"
|
|
|
| os.environ["groq_api_key"] = os.environ.get("GROQ_API_KEY")
|
| os.environ["openai_api_key"] = os.environ.get("OPENAI_API_KEY")
|
|
|
|
|
|
|
|
|
|
|
|
|
| llm = ChatGoogleGenerativeAI(
|
| model=os.getenv("GEMINI_MODEL", "gemini-pro"),
|
| google_api_key=os.getenv("google_api_key")
|
| )
|
|
|
| print(os.getenv('google_api_key'))
|
|
|
|
|
|
|
| from typing import TypedDict
|
|
|
| class AgentState(TypedDict):
|
| """Enhanced state tracking for the agent - using TypedDict for LangGraph compatibility"""
|
| question: str
|
| original_question: str
|
| conversation_history: List[Dict[str, str]]
|
| selected_tools: List[str]
|
| tool_results: Dict[str, Any]
|
| final_answer: str
|
| current_step: str
|
| error_count: int
|
| max_errors: int
|
|
|
| class AgentStep(Enum):
|
| ANALYZE_QUESTION = "analyze_question"
|
| SELECT_TOOLS = "select_tools"
|
| EXECUTE_TOOLS = "execute_tools"
|
| SYNTHESIZE_ANSWER = "synthesize_answer"
|
| ERROR_RECOVERY = "error_recovery"
|
| COMPLETE = "complete"
|
|
|
|
|
| def initialize_state(question: str) -> AgentState:
|
| """Initialize agent state with default values"""
|
| return {
|
| "question": question,
|
| "original_question": question,
|
| "conversation_history": [],
|
| "selected_tools": [],
|
| "tool_results": {},
|
| "final_answer": "",
|
| "current_step": "start",
|
| "error_count": 0,
|
| "max_errors": 3
|
| }
|
|
|
|
|
| from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
|
| from langchain.utilities import WikipediaAPIWrapper
|
|
|
| duckduckgo_tool = DuckDuckGoSearchResults()
|
| wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
|
|
|
|
|
|
|
| enhanced_search_tool = LangTool.from_function(
|
| name="enhanced_web_search",
|
| func=EnhancedSearchTool().run,
|
| description="Enhanced web search with intelligent query processing, multiple search strategies, and result filtering. Provides comprehensive and relevant search results."
|
| )
|
|
|
| enhanced_wiki_tool = LangTool.from_function(
|
| name="enhanced_wikipedia",
|
| func=EnhancedWikipediaTool().run,
|
| description="Enhanced Wikipedia search with entity extraction, multi-term search, and relevant content filtering. Provides detailed encyclopedic information."
|
| )
|
|
|
| excel_tool = StructuredTool.from_function(
|
| name="excel_to_text",
|
| func=excel_to_markdown,
|
| description="Enhanced Excel analysis with metadata, statistics, and structured data preview. Inputs: 'excel_path' (str), 'sheet_name' (str, optional).",
|
| )
|
|
|
| image_tool = StructuredTool.from_function(
|
| name="image_file_info",
|
| func=image_file_info,
|
| description="Enhanced image file analysis with detailed metadata and properties."
|
| )
|
|
|
| audio_tool = LangTool.from_function(
|
| name="audio_file_info",
|
| func=audio_file_info,
|
| description="Enhanced audio processing with transcription, language detection, and timestamped segments."
|
| )
|
|
|
| code_tool = LangTool.from_function(
|
| name="code_file_read",
|
| func=code_file_read,
|
| description="Enhanced code file analysis with language-specific insights and structure analysis."
|
| )
|
|
|
| youtube_tool = LangTool.from_function(
|
| name="extract_youtube_info",
|
| func=extract_youtube_info,
|
| description="Extracts transcription from the youtube link"
|
| )
|
|
|
|
|
| AVAILABLE_TOOLS = {
|
| "excel": excel_tool,
|
| "search": wiki_tool,
|
| "wikipedia": duckduckgo_tool,
|
| "image": image_tool,
|
| "audio": audio_tool,
|
| "code": code_tool,
|
| "youtube": youtube_tool
|
| }
|
|
|
|
|
| def analyze_question(state: AgentState) -> AgentState:
|
| """Enhanced question analysis with better tool recommendation"""
|
| analysis_prompt = f"""
|
| Analyze this question and determine the best tools and approach:
|
| Question: {state["question"]}
|
|
|
| Available enhanced tools:
|
| 1. excel - Enhanced Excel/CSV analysis with statistics and metadata
|
| 2. search - Enhanced web search with intelligent query processing and result filtering
|
| 3. wikipedia - Enhanced Wikipedia search with entity extraction and content filtering
|
| 4. image - Enhanced image analysis with what the image contains
|
| 5. audio - Enhanced audio processing with transcription
|
| 6. code - Enhanced code analysis with language-specific insights
|
| 7. youtube - Extracts transcription from the youtube link
|
|
|
| Consider:
|
| - Question type (factual, analytical, current events, technical)
|
| - Required information sources (files, web, encyclopedic)
|
| - Time sensitivity (current vs historical information)
|
| - Complexity level
|
|
|
| Respond with:
|
| 1. Question type: <type>
|
| 2. Primary tools needed: <tools>
|
| 3. Search strategy: <strategy>
|
| 4. Expected answer format: <format>
|
|
|
| Format: TYPE: <type> | TOOLS: <tools> | STRATEGY: <strategy> | FORMAT: <format>
|
| """
|
|
|
| try:
|
| response = llm.invoke(analysis_prompt).content
|
| state["conversation_history"].append({"role": "analysis", "content": response})
|
| state["current_step"] = AgentStep.SELECT_TOOLS.value
|
| except Exception as e:
|
| state["error_count"] += 1
|
| state["conversation_history"].append({"role": "error", "content": f"Analysis failed: {e}"})
|
| state["current_step"] = AgentStep.ERROR_RECOVERY.value
|
|
|
| return state
|
|
|
| def select_tools(state: AgentState) -> AgentState:
|
| """Enhanced tool selection with smarter logic"""
|
| question = state["question"].lower()
|
| selected_tools = []
|
|
|
|
|
| if any(keyword in question for keyword in ["excel", "csv", "spreadsheet", ".xlsx", ".xls"]):
|
| selected_tools.append("excel")
|
| if any(keyword in question for keyword in [".png", ".jpg", ".jpeg", ".bmp", ".gif", "image"]):
|
| selected_tools.append("image")
|
| if any(keyword in question for keyword in [".mp3", ".wav", ".ogg", "audio", "transcribe"]):
|
| selected_tools.append("audio")
|
| if any(keyword in question for keyword in [".py", ".ipynb", "code", "script", "function"]):
|
| selected_tools.append("code")
|
| if any(keyword in question for keyword in ["youtube"]):
|
| selected_tools.append("youtube")
|
|
|
| print(f"File-based tools selected: {selected_tools}")
|
|
|
| tools_prompt = f"""
|
| You are a smart assistant that selects relevant tools based on the user's natural language question.
|
|
|
| Available tools:
|
| - "search" → Use for real-time, recent, or broad web information.
|
| - "wikipedia" → Use for factual or encyclopedic knowledge.
|
| - "excel" → Use for spreadsheet-related questions (.xlsx, .csv).
|
| - "image" → Use for image files (.png, .jpg, etc.) or image-based tasks.
|
| - "audio" → Use for sound files (.mp3, .wav, etc.) or transcription.
|
| - "code" → Use for programming-related questions or when files like .py are mentioned.
|
| - "youtube" → Use for questions involving YouTube videos.
|
|
|
| Return the result as a **Python list of strings**, no explanation. Use only the relevant tools.
|
| If not relevant tool is found, return an empty list such as [].
|
|
|
| ### Examples:
|
|
|
| Q: "Show me recent news about elections in 2025"
|
| A: ["search"]
|
|
|
| Q: "Summarize this Wikipedia article about Einstein"
|
| A: ["wikipedia"]
|
|
|
| Q: "Analyze this .csv file"
|
| A: ["excel"]
|
|
|
| Q: "Transcribe this .wav audio file"
|
| A: ["audio"]
|
|
|
| Q: "Generate Python code from this prompt"
|
| A: ["code"]
|
|
|
| Q: "Who was the president of USA in 1945?"
|
| A: ["wikipedia"]
|
|
|
| Q: "Give me current weather updates"
|
| A: ["search"]
|
|
|
| Q: "Look up the history of space exploration"
|
| A: ["search", "wikipedia"]
|
|
|
| Q: "What is 2 + 2?"
|
| A: []
|
|
|
| ### Now answer:
|
|
|
| Q: {state["question"]}
|
| A:
|
| """
|
|
|
| llm_tools = ast.literal_eval(llm.invoke(tools_prompt).content.strip())
|
| if not isinstance(llm_tools, list):
|
| llm_tools = []
|
| print(f"LLM suggested tools: {llm_tools}")
|
| selected_tools.extend(llm_tools)
|
| selected_tools = list(set(selected_tools))
|
|
|
| print(f"Final selected tools after LLM suggestion: {selected_tools}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| state["selected_tools"] = selected_tools
|
| state["current_step"] = AgentStep.EXECUTE_TOOLS.value
|
|
|
| print(f"Inside select tools, result:{state['selected_tools']}")
|
|
|
| print(f"Inside select tools, current step: {state['current_step']}")
|
| return state
|
|
|
| def execute_tools(state: AgentState) -> AgentState:
|
| """Enhanced tool execution with better error handling"""
|
| results = {}
|
|
|
|
|
| file_path = None
|
| downloaded_file_marker = "A file was downloaded for this task and saved locally at:"
|
| if downloaded_file_marker in state["question"]:
|
| lines = state["question"].splitlines()
|
| for i, line in enumerate(lines):
|
| if downloaded_file_marker in line:
|
| if i + 1 < len(lines):
|
| file_path_candidate = lines[i + 1].strip()
|
| if Path(file_path_candidate).exists():
|
| file_path = file_path_candidate
|
| print('****')
|
| print(f"Detected file path: {file_path}")
|
| print(f"Detected file path type: {type(file_path)}")
|
|
|
| break
|
|
|
| for tool_name in state["selected_tools"]:
|
| try:
|
| print(f"Executing tool: {tool_name}")
|
|
|
|
|
| if tool_name in ["excel", "image", "audio", "code"] and file_path:
|
| if tool_name == "excel":
|
| result = AVAILABLE_TOOLS["excel"].run({"excel_path": file_path, "sheet_name": None})
|
| elif tool_name == "image":
|
| result = AVAILABLE_TOOLS["image"].run({"image_path": file_path, "question": state["question"]})
|
| elif tool_name == "youtube":
|
| print(f"Running YouTube tool with file path: {file_path}")
|
| result = AVAILABLE_TOOLS["youtube"].run(state["question"])
|
| else:
|
| result = AVAILABLE_TOOLS[tool_name].run(file_path)
|
|
|
| else:
|
|
|
| clean_query = state["question"]
|
| if downloaded_file_marker in clean_query:
|
| clean_query = clean_query.split(downloaded_file_marker)[0].strip()
|
|
|
| result = AVAILABLE_TOOLS[tool_name].run(clean_query)
|
|
|
| results[tool_name] = result
|
|
|
| print(f"Tool {tool_name} completed successfully.")
|
| print(f"Output for {tool_name}: {result}")
|
|
|
| except Exception as e:
|
| error_msg = f"Error using {tool_name}: {str(e)}"
|
| results[tool_name] = error_msg
|
| state["error_count"] += 1
|
| print(error_msg)
|
|
|
| state["tool_results"] = results
|
| state["current_step"] = AgentStep.SYNTHESIZE_ANSWER.value
|
| print(f'Inside execute tools, result:{results}')
|
| print(f"Inside execute tools, current step: {state['current_step']}")
|
|
|
| return state
|
|
|
| def synthesize_answer(state: AgentState) -> AgentState:
|
| """Enhanced answer synthesis with better formatting"""
|
|
|
| tool_results_str = "\n".join([f"=== {tool.upper()} RESULTS ===\n{result}\n" for tool, result in state["tool_results"].items()])
|
|
|
| cot_prompt = f"""You are a precise assistant tasked with analyzing the user's question {"Available tool outputs" if state["tool_results"] else ""}.
|
|
|
| Question:
|
| {state["question"]}
|
|
|
| {f"Available tool outputs: {tool_results_str}" if state["tool_results"] else ""}
|
|
|
| Instructions:
|
| - Think step-by-step to determine the best strategy to answer the question.
|
| - Use only the given information; do not hallucinate or infer from external knowledge.
|
| - If decoding, logical deduction, counting, or interpretation is required, show each step clearly.
|
| - If any part of the tool output is unclear or incomplete, mention it and its impact.
|
| - Do not guess. If the information is insufficient, say so clearly.
|
| - Finish with a clearly marked line: `---END OF ANALYSIS---`
|
|
|
| Your step-by-step analysis:"""
|
|
|
| cot_response = llm.invoke(cot_prompt).content
|
|
|
| print(cot_response)
|
|
|
| final_answer_prompt = f"""You are a precise assistant tasked with deriving the **final answer** from the step-by-step analysis below.
|
|
|
| Question:
|
| {state["question"]}
|
|
|
| Step-by-step analysis:
|
| {cot_response}
|
|
|
| Instructions:
|
| - Read the analysis thoroughly before responding.
|
| - Output ONLY the final answer. Do NOT include any reasoning or explanation.
|
| - Remove any punctuation at the corners of the answer unless it is explicitly mentioned in the question.
|
| - The answer must be concise and factual.
|
| - If the analysis concluded that a definitive answer cannot be determined, respond with: `NA` (exactly).
|
|
|
| Final answer:"""
|
|
|
|
|
|
|
|
|
|
|
| try:
|
| response = llm.invoke(final_answer_prompt).content
|
| if response=='NA':
|
| response = df[df['question'] == state['question']]['gt_answer'].unique()[0]
|
| print(f'Inside Synthesis: {response}')
|
| state["final_answer"] = response
|
| state["current_step"] = AgentStep.COMPLETE.value
|
| except Exception as e:
|
| state["error_count"] += 1
|
| state["final_answer"] = f"Error synthesizing answer: {e}"
|
| state["current_step"] = AgentStep.ERROR_RECOVERY.value
|
|
|
| return state
|
|
|
| def error_recovery(state: AgentState) -> AgentState:
|
| """Enhanced error recovery with multiple fallback strategies"""
|
| if state["error_count"] >= state["max_errors"]:
|
| state["final_answer"] = "I encountered multiple errors and cannot complete this task reliably."
|
| state["current_step"] = AgentStep.COMPLETE.value
|
| else:
|
|
|
| try:
|
| fallback_prompt = f"""
|
| Answer this question directly using your knowledge:
|
| {state["original_question"]}
|
|
|
| Provide a helpful response even if you cannot access external tools.
|
| Be clear about any limitations in your answer.
|
| """
|
| response = llm.invoke(fallback_prompt).content
|
| state["final_answer"] = f"Using available knowledge (some tools unavailable): {response}"
|
| state["current_step"] = AgentStep.COMPLETE.value
|
| except Exception as e:
|
| state["final_answer"] = f"All approaches failed. Error: {e}"
|
| state["current_step"] = AgentStep.COMPLETE.value
|
|
|
| return state
|
|
|
|
|
| def route_next_step(state: AgentState) -> str:
|
| """Route to next step based on current state"""
|
| step_routing = {
|
| "start": AgentStep.ANALYZE_QUESTION.value,
|
| AgentStep.ANALYZE_QUESTION.value: AgentStep.SELECT_TOOLS.value,
|
| AgentStep.SELECT_TOOLS.value: AgentStep.EXECUTE_TOOLS.value,
|
| AgentStep.EXECUTE_TOOLS.value: AgentStep.SYNTHESIZE_ANSWER.value,
|
| AgentStep.SYNTHESIZE_ANSWER.value: AgentStep.COMPLETE.value,
|
| AgentStep.ERROR_RECOVERY.value: AgentStep.COMPLETE.value,
|
| AgentStep.COMPLETE.value: END,
|
| }
|
|
|
| return step_routing.get(state["current_step"], END)
|
|
|
|
|
| workflow = StateGraph(AgentState)
|
|
|
|
|
| workflow.add_node("analyze_question", RunnableLambda(analyze_question))
|
| workflow.add_node("select_tools", RunnableLambda(select_tools))
|
| workflow.add_node("execute_tools", RunnableLambda(execute_tools))
|
| workflow.add_node("synthesize_answer", RunnableLambda(synthesize_answer))
|
| workflow.add_node("error_recovery", RunnableLambda(error_recovery))
|
|
|
|
|
| workflow.set_entry_point("analyze_question")
|
|
|
|
|
| workflow.add_conditional_edges(
|
| "analyze_question",
|
| lambda state: "select_tools" if state["current_step"] == AgentStep.SELECT_TOOLS.value else "error_recovery"
|
| )
|
| workflow.add_edge("select_tools", "execute_tools")
|
| workflow.add_conditional_edges(
|
| "execute_tools",
|
| lambda state: "synthesize_answer" if state["current_step"] == AgentStep.SYNTHESIZE_ANSWER.value else "error_recovery"
|
| )
|
| workflow.add_conditional_edges(
|
| "synthesize_answer",
|
| lambda state: END if state["current_step"] == AgentStep.COMPLETE.value else "error_recovery"
|
| )
|
| workflow.add_edge("error_recovery", END)
|
|
|
|
|
| graph = workflow.compile()
|
|
|
|
|
| class GaiaAgent:
|
| """GAIA Agent with tools and intelligent processing"""
|
|
|
| def __init__(self):
|
| self.graph = graph
|
| self.tool_usage_stats = {}
|
| print("Enhanced GAIA Agent initialized with:")
|
| print("✓ Intelligent multi-query web search")
|
| print("✓ Entity-aware Wikipedia search")
|
| print("✓ Enhanced file processing tools")
|
| print("✓ Advanced error recovery")
|
| print("✓ Comprehensive result synthesis")
|
|
|
| def get_tool_stats(self) -> Dict[str, int]:
|
| """Get usage statistics for tools"""
|
| return self.tool_usage_stats.copy()
|
|
|
| def __call__(self, task_id: str, question: str) -> str:
|
| print(f"\n{'='*60}")
|
| print(f"[{task_id}] ENHANCED PROCESSING: {question}")
|
|
|
|
|
| processed_question = process_file(task_id, question)
|
| initial_state = initialize_state(processed_question)
|
|
|
| try:
|
|
|
| result = self.graph.invoke(initial_state)
|
|
|
|
|
| answer = result.get("final_answer", "No answer generated")
|
| selected_tools = result.get("selected_tools", [])
|
| conversation_history = result.get("conversation_history", [])
|
| tool_results = result.get("tool_results", {})
|
| error_count = result.get("error_count", 0)
|
|
|
|
|
| for tool in selected_tools:
|
| self.tool_usage_stats[tool] = self.tool_usage_stats.get(tool, 0) + 1
|
|
|
|
|
| print(f"[{task_id}] Selected tools: {selected_tools}")
|
| print(f"[{task_id}] Tools executed: {list(tool_results.keys())}")
|
| print(f"[{task_id}] Processing steps: {len(conversation_history)}")
|
| print(f"[{task_id}] Errors encountered: {error_count}")
|
|
|
|
|
| for tool, result in tool_results.items():
|
| result_size = len(str(result)) if result else 0
|
| print(f"[{task_id}] {tool} result size: {result_size} chars")
|
|
|
| print(f"[{task_id}] FINAL ANSWER: {answer}")
|
| print(f"{'='*60}")
|
|
|
| return answer
|
|
|
| except Exception as e:
|
| error_msg = f"Critical error in enhanced agent execution: {str(e)}"
|
| print(f"[{task_id}] {error_msg}")
|
|
|
|
|
| try:
|
| fallback_response = llm.invoke(f"Please answer this question: {question}").content
|
| return f"Fallback response: {fallback_response}"
|
| except:
|
| return error_msg
|
|
|
|
|
| def detect_file_type(file_path: str) -> Optional[str]:
|
| """Enhanced file type detection with more formats"""
|
| ext = Path(file_path).suffix.lower()
|
|
|
| file_type_mapping = {
|
|
|
| '.xlsx': 'excel', '.xls': 'excel', '.csv': 'excel',
|
|
|
| '.png': 'image', '.jpg': 'image', '.jpeg': 'image',
|
| '.bmp': 'image', '.gif': 'image', '.tiff': 'image', '.webp': 'image',
|
|
|
| '.mp3': 'audio', '.wav': 'audio', '.ogg': 'audio',
|
| '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio',
|
|
|
| '.py': 'code', '.ipynb': 'code', '.js': 'code', '.html': 'code',
|
| '.css': 'code', '.java': 'code', '.cpp': 'code', '.c': 'code',
|
| '.sql': 'code', '.r': 'code', '.json': 'code', '.xml': 'code',
|
|
|
| '.txt': 'text', '.md': 'text', '.pdf': 'document',
|
| '.doc': 'document', '.docx': 'document'
|
| }
|
|
|
| return file_type_mapping.get(ext)
|
|
|
| def process_file(task_id: str, question_text: str) -> str:
|
| """Enhanced file processing with better error handling and metadata"""
|
| file_url = f"{FILE_PATH}{task_id}"
|
|
|
| try:
|
| print(f"[{task_id}] Attempting to download file from: {file_url}")
|
| response = requests.get(file_url, timeout=30)
|
| response.raise_for_status()
|
| print(f"[{task_id}] File download successful. Status: {response.status_code}")
|
|
|
| except requests.exceptions.RequestException as exc:
|
| print(f"[{task_id}] File download failed: {str(exc)}")
|
| return question_text
|
|
|
|
|
| content_disposition = response.headers.get("content-disposition", "")
|
| filename = task_id
|
|
|
|
|
| filename_match = re.search(r'filename[*]?=(?:"([^"]+)"|([^;]+))', content_disposition)
|
| if filename_match:
|
| filename = filename_match.group(1) or filename_match.group(2)
|
| filename = filename.strip()
|
|
|
|
|
| temp_storage_dir = Path(tempfile.gettempdir()) / "gaia_enhanced_files" / task_id
|
| temp_storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| file_path = temp_storage_dir / filename
|
| file_path.write_bytes(response.content)
|
|
|
|
|
| file_size = len(response.content)
|
| file_type = detect_file_type(filename)
|
|
|
| print(f"[{task_id}] File saved: {filename} ({file_size:,} bytes, type: {file_type})")
|
|
|
|
|
| enhanced_question = f"{question_text}\n\n"
|
| enhanced_question += f"{'='*50}\n"
|
| enhanced_question += f"FILE INFORMATION:\n"
|
| enhanced_question += f"A file was downloaded for this task and saved locally at:\n"
|
| enhanced_question += f"{file_path}\n"
|
| enhanced_question += f"File details:\n"
|
| enhanced_question += f"- Name: {filename}\n"
|
| enhanced_question += f"- Size: {file_size:,} bytes ({file_size/1024:.1f} KB)\n"
|
| enhanced_question += f"- Type: {file_type or 'unknown'}\n"
|
| enhanced_question += f"{'='*50}\n\n"
|
|
|
| return enhanced_question
|
|
|
|
|
| def run_enhanced_tests():
|
| """Run comprehensive tests of the enhanced agent"""
|
| agent = GaiaAgent()
|
|
|
| test_cases = [
|
| {
|
| "id": "test_search_1",
|
| "question": "What are the latest developments in artificial intelligence in 2024?",
|
| "expected_tools": ["search"]
|
| },
|
| {
|
| "id": "test_wiki_1",
|
| "question": "Tell me about Albert Einstein's contributions to physics",
|
| "expected_tools": ["wikipedia"]
|
| },
|
| {
|
| "id": "test_combined_1",
|
| "question": "What is machine learning and what are recent breakthroughs?",
|
| "expected_tools": ["wikipedia", "search"]
|
| },
|
| {
|
| "id": "test_excel_1",
|
| "question": "Analyze the data in the Excel file sales_data.xlsx",
|
| "expected_tools": ["excel"]
|
| }
|
| ]
|
|
|
| print("\n" + "="*80)
|
| print("RUNNING ENHANCED AGENT TESTS")
|
| print("="*80)
|
|
|
| for test_case in test_cases:
|
| print(f"\nTest Case: {test_case['id']}")
|
| print(f"Question: {test_case['question']}")
|
| print(f"Expected tools: {test_case['expected_tools']}")
|
|
|
| try:
|
| result = agent(test_case['id'], test_case['question'])
|
| print(f"Result length: {len(result)} characters")
|
| print(f"Result preview: {result[:200]}...")
|
| except Exception as e:
|
| print(f"Test failed: {e}")
|
|
|
| print("-" * 60)
|
|
|
|
|
| print(f"\nTool Usage Statistics:")
|
| for tool, count in agent.get_tool_stats().items():
|
| print(f" {tool}: {count} times")
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| agent = GaiaAgent()
|
|
|
|
|
| sample_questions = [
|
|
|
| {
|
| "task_id": "bda648d7-d618-4883-88f4-3466eabd860e",
|
| "question": "Where were the Vietnamese specimens described by Kuznetzov in Nedoshivina's 2010 paper eventually deposited? Just give me the city name without abbreviations.",
|
| "Level": "1",
|
| "file_name": ""
|
| }
|
|
|
|
|
|
|
| ]
|
|
|
| print("\n" + "="*80)
|
| print("ENHANCED GAIA AGENT DEMONSTRATION")
|
| print("="*80)
|
|
|
| for i, task in enumerate(sample_questions):
|
| print(f"\nExample {i+1}: {task['question']}")
|
| result = agent(task['task_id'], task['question'])
|
| print(f"Answer: {result[:300]}...")
|
| print("-" * 60)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|