| import json |
| import os |
| import re |
| import time |
|
|
| import yaml |
| from duckduckgo_search.exceptions import DuckDuckGoSearchException |
| from smolagents import FinalAnswerTool, Tool, OpenAIServerModel, CodeAgent |
|
|
|
|
| python_interpreter_max_print_outputs_length = 10**6 |
|
|
|
|
| class CustomDuckDuckGoSearchTool(Tool): |
| name = "web_search" |
| description = """Performs a duckduckgo web search based on your query (think a Google search) then returns the top search results.""" |
| inputs = {"query": {"type": "string", "description": "The search query to perform."}} |
| output_type = "string" |
|
|
| def __init__(self, max_results=10, **kwargs): |
| super().__init__() |
| self.max_results = max_results |
| try: |
| from duckduckgo_search import DDGS |
| except ImportError as e: |
| raise ImportError( |
| "You must install package `duckduckgo_search` to run this tool: for instance run `pip install duckduckgo-search`." |
| ) from e |
| self.ddgs = DDGS(**kwargs) |
|
|
| def forward(self, query: str) -> str: |
| num_tries = 5 |
| for cnt in range(num_tries): |
| try: |
| results = self.ddgs.text(query, max_results=self.max_results) |
| break |
| except DuckDuckGoSearchException as e: |
| print(e) |
| if cnt == num_tries - 1: |
| raise |
| time.sleep(1.5) |
|
|
| if len(results) == 0: |
| raise Exception("No results found! Try a less restrictive/shorter query.") |
| postprocessed_results = [f"[{result['title']}]({result['href']})\n{result['body']}" for result in results] |
| return "## Search Results\n\n" + "\n\n".join(postprocessed_results) |
|
|
|
|
| class CustomVisitWebpageTool(Tool): |
| name = "visit_webpage" |
| description = ( |
| "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." |
| ) |
| inputs = { |
| "url": { |
| "type": "string", |
| "description": "The url of the webpage to visit.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, url: str) -> str: |
| try: |
| import requests |
| from markdownify import markdownify |
| from requests.exceptions import RequestException |
|
|
| from smolagents.utils import truncate_content |
| except ImportError as e: |
| raise ImportError( |
| "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." |
| ) from e |
| try: |
| |
| response = requests.get(url, timeout=20) |
| response.raise_for_status() |
|
|
| |
| markdown_content = markdownify(response.text).strip() |
|
|
| |
| markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) |
|
|
| return truncate_content(markdown_content, python_interpreter_max_print_outputs_length) |
|
|
| except requests.exceptions.Timeout: |
| return "The request timed out. Please try again later or check the URL." |
| except RequestException as e: |
| return f"Error fetching the webpage: {str(e)}" |
| except Exception as e: |
| return f"An unexpected error occurred: {str(e)}" |
|
|
|
|
| class SmolAgent: |
| def __init__(self, openai_api_key=None): |
| final_answer = FinalAnswerTool() |
| search_tool = CustomDuckDuckGoSearchTool(max_results=3) |
| visit_webpage_tool = CustomVisitWebpageTool() |
| model = OpenAIServerModel( |
| model_id="gpt-4.1-2025-04-14", |
| |
| |
| max_completion_tokens=1024, |
| temperature=0.01, |
| api_key=openai_api_key, |
| ) |
| with open('prompt_templates.yaml', 'r') as f: |
| prompt_templates = yaml.safe_load(f) |
| with open('system_prompt.txt', 'r') as f: |
| prompt_templates['system_prompt'] = f.read() |
| self.agent = CodeAgent( |
| model=model, |
| prompt_templates=prompt_templates, |
| tools=[search_tool, visit_webpage_tool, final_answer], |
| max_steps=10, |
| verbosity_level=100, |
| grammar=None, |
| planning_interval=None, |
| name='Advanced GAIA Agent', |
| description=None, |
| max_print_outputs_length=python_interpreter_max_print_outputs_length, |
| ) |
| self.agent.visualize() |
|
|
| def run(self, task: dict[str, str]) -> str: |
| if len(task.get('file_name')) != 0: |
| return '' |
|
|
| question = task.get('question') |
| if question.find('www.youtube.com') != -1: |
| return '' |
|
|
| return self.agent.run(question) |
|
|
|
|
| if __name__ == '__main__': |
| openai_key = os.getenv('OPENAI_API_KEY') |
| if not openai_key: |
| with open("data/openai.key", "r") as f: |
| openai_key = f.read().strip() |
|
|
| agent = SmolAgent(openai_api_key=openai_key) |
|
|
| with open('data/questions.json', 'r') as f: |
| questions = json.load(f) |
|
|
| for q in questions: |
| print('\n===') |
| print(q) |
| print('\n---') |
| a = agent.run(q) |
| print('\n---') |
| print(a) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|