| import asyncio |
| import logging |
| import os |
| from typing import Any, Dict, List, Optional |
|
|
| import aiohttp |
| import html2text |
| from griffe import json_decoder |
| from llama_index.core import Settings, VectorStoreIndex |
| from llama_index.core.schema import Document |
| from llama_index.core.tools import FunctionTool |
| from llama_index.embeddings.openai import OpenAIEmbedding |
| from llama_index.llms.openai import OpenAI |
| from llama_index.tools.tavily_research import TavilyToolSpec |
| from llama_index.tools.wikipedia import WikipediaToolSpec |
| from llama_index.readers.youtube_transcript import YoutubeTranscriptReader |
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| Settings.llm = OpenAI(model="gpt-4o", temperature=0.1) |
| Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") |
|
|
|
|
|
|
| async def fetch_and_process( |
| urls: List[str], session: aiohttp.ClientSession, timeout: int = 10 |
| ) -> List[Document]: |
| """Fetch and convert webpages to Document objects concurrently.""" |
|
|
| async def fetch(url: str) -> Dict[str, str]: |
| try: |
| async with session.get(url, timeout=timeout) as response: |
| return {"text": await response.text(), "url": str(response.url)} |
| except (asyncio.TimeoutError, aiohttp.ClientError) as e: |
| logging.warning(f"Could not fetch {url}: {repr(e)}") |
| return {"text": "", "url": url} |
|
|
| tasks = [fetch(url) for url in urls] |
| responses = await asyncio.gather(*tasks) |
|
|
| return [ |
| Document(text=html2text.html2text(resp["text"]), id_=resp["url"]) |
| for resp in responses if resp["text"] |
| ] |
|
|
|
|
| async def summarize_websites(urls: List[str], query: str) -> List[str]: |
| """Summarize a query from content across multiple websites. Even if there is only one website, it will still be used. |
| |
| Args: |
| urls: A list of URLs to summarize. |
| query: The query to summarize. |
| Returns: |
| A list of summaries. |
| """ |
|
|
| logging.info(f"Summarizing {len(urls)} websites for query: {query}") |
|
|
| Settings.llm = OpenAI(model="gpt-4o-mini") |
| Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") |
|
|
| summaries = [] |
|
|
| async with aiohttp.ClientSession() as session: |
| documents = await fetch_and_process(urls, session) |
| for doc in documents: |
| index = VectorStoreIndex.from_documents([doc]) |
| result = index.as_query_engine().query(f"Summarize in very meticulous detail. {query}") |
| summaries.append(f"Source: {doc.id_} \nContent: {result.response}") |
|
|
| return summaries |
|
|
|
|
| def tavily_search(query: str, max_results: Optional[int] = 10) -> List[Dict]: |
| """ |
| Tavily search with result formatting. |
| Args: |
| query: The query to search for. |
| max_results: The maximum number of results to return. |
| Returns: |
| results: A list of dictionaries containing the results as URLS. Need to be used with the summarize_websites tool. |
| """ |
| logger.info(f"Called tavily_search for: {query}") |
| |
| try: |
| search_engine = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")) |
| search_results = search_engine.search(query, max_results=max_results) |
|
|
| results = [] |
| for document in search_results: |
| results.append({ |
| "url": document.metadata.get("url", ""), |
| "content": f"Title: {document.metadata.get('title', '')}\nContent: {document.text}" |
| }) |
|
|
| return results |
| except Exception as e: |
| logger.error(f"Tavily search failed: {str(e)}") |
| return [{"error": f"Search failed: {str(e)}"}] |
|
|
|
|
| def search_wikipedia(query: str, language: str = "en") -> str: |
| """ |
| Search Wikipedia for specific information. This is a more efficient way to search Wikipedia than the tavily_search tool. Need to be used with the summarize_websites tool. |
| Args: |
| query: The search query |
| language: Wikipedia language code (default: "en") |
| Returns: |
| str: Wikipedia content summary |
| """ |
| logger.info(f"Searching Wikipedia for: {query}") |
|
|
| try: |
| wikipedia_tool = WikipediaToolSpec() |
| search_results = wikipedia_tool.search_data(query, language=language) |
| return search_results |
| except Exception as e: |
| logger.error(f"Wikipedia search failed: {str(e)}") |
| return f"Wikipedia search failed: {str(e)}" |
| |
| def transcribe_youtube_video(video_url: str) -> str: |
| """Transcribe a YouTube video.""" |
| |
| reader = YoutubeTranscriptReader() |
| transcript = reader.load_data(video_url) |
| |
| return transcript |
|
|
| def get_web_tools(): |
| """Return all available tools for the agent.""" |
| return [ |
| FunctionTool.from_defaults(summarize_websites), |
| FunctionTool.from_defaults(tavily_search), |
| FunctionTool.from_defaults(search_wikipedia), |
| FunctionTool.from_defaults(transcribe_youtube_video), |
| ] |