| import contextlib |
| from langchain_core.tools import tool |
| from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun |
| from langchain_community.utilities import WikipediaAPIWrapper |
| from langchain_tavily import TavilySearch |
| from langchain_chroma import Chroma |
| from langchain_community.document_loaders import TextLoader, PyPDFLoader, CSVLoader, JSONLoader |
| from langchain_community.document_loaders.image import UnstructuredImageLoader |
| from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat |
| |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_huggingface import HuggingFaceEmbeddings |
| from transformers import pipeline |
| import asyncio |
| import os |
| import io |
| import ast |
| from dotenv import load_dotenv |
| |
| |
|
|
| load_dotenv() |
| os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") |
| os.environ["UNSTRUCTURED_API_KEY"] = os.getenv("UNSTRUCTURED_API_KEY") |
|
|
| |
| @tool |
| def retriever(query: str, file_path: str) -> str: |
| """ |
| Retrieve relevant information from a text, PDF, CSV JSON or image file using semantic search. |
| |
| Args: |
| query (str): The search query string. |
| file_path (str): Path to the text file to be searched. |
| |
| Returns: |
| str: The most relevant text chunks from the file based on the query. |
| """ |
| try: |
| if file_path.endswith(".pdf"): |
| loader = PyPDFLoader(file_path) |
| elif file_path.endswith(".csv"): |
| loader = CSVLoader(file_path) |
| elif file_path.endswith(".json"): |
| loader = JSONLoader(file_path) |
| elif file_path.endswith((".png", ".jpeg", ".jpg")): |
| loader = UnstructuredImageLoader(file_path) |
| else: |
| loader = TextLoader(file_path) |
| |
| doc_list = [] |
| docs = loader.load() |
| doc_list.extend(docs) |
| |
| text_splitter= RecursiveCharacterTextSplitter( |
| chunk_size=100, |
| chunk_overlap=20, |
| length_function=len |
| ) |
| chunks = text_splitter.split_documents(doc_list) |
| |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") |
| vectorstore = Chroma.from_documents( |
| documents=chunks, |
| embedding=embeddings, |
| ) |
| retriever = vectorstore.as_retriever(search_kwargs = {"k":1}) |
| doc_result = retriever.invoke(query) |
| result = '\n\n'.join(doc.page_content for doc in doc_result) |
| return result |
| except Exception: |
| return "No results found." |
|
|
| |
| @tool |
| def web_search(query: str) -> str: |
| """ |
| Perform a web search using DuckDuckGo. |
| |
| Args: |
| query (str): The search query string. |
| |
| Returns: |
| str: The result of the web search as a string. |
| If an exception occurs, returns a fallback string indicating no results were found. |
| """ |
| search_engine = DuckDuckGoSearchRun() |
| try: |
| response = search_engine.invoke(query) |
| return response |
| except: |
| return f"No results found on the web for this query: {query}." |
|
|
| @tool |
| def wiki_search(query: str) -> str: |
| """ |
| Search Wikipedia for the given query and return a summary. |
| |
| Args: |
| query (str): The search query string. |
| |
| Returns: |
| str: A summary or relevant information from Wikipedia about the query. |
| """ |
| wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) |
| response = wikipedia.run(query) |
| return response |
|
|
| @tool |
| def youtube_analysis(yt_url: str) -> str: |
| """ |
| Analyze a YouTube video and return its transcript and metadata. |
| |
| Args: |
| yt_url (str): The URL of the YouTube video. |
| |
| Returns: |
| str: A string containing video information and transcript chunks. |
| """ |
|
|
| loader = YoutubeLoader.from_youtube_url( |
| yt_url, |
| add_video_info=True, |
| transcript_format=TranscriptFormat.CHUNKS, |
| chunk_size_seconds=30, |
| ) |
| return "\n\n".join(map(repr, loader.load())) |
|
|
| |
| @tool |
| def add_numbers(a: int|float, b:int|float)-> int|float: |
| """ |
| Add two numbers. |
| |
| Args: |
| a (int | float): The first number. |
| b (int | float): The second number. |
| |
| Returns: |
| int | float: The sum of a and b. |
| """ |
| return a + b |
| @tool |
| def subtract_numbers(a: int|float, b:int|float)-> int|float: |
| """ |
| Subtract one number from another. |
| |
| Args: |
| a (int | float): The number to subtract from. |
| b (int | float): The number to subtract. |
| |
| Returns: |
| int | float: The result of a minus b. |
| """ |
| return a - b |
|
|
| @tool |
| def multiply_numbers(a: int|float, b:int|float)-> int|float: |
| """ |
| Multiply two numbers. |
| |
| Args: |
| a (int | float): The first number. |
| b (int | float): The second number. |
| |
| Returns: |
| int | float: The product of a and b. |
| """ |
| return a * b |
|
|
| @tool |
| def divide_numbers(a: int|float, b:int|float)-> float|None: |
| """ |
| Divide one number by another. |
| |
| Args: |
| a (int | float): The numerator. |
| b (int | float): The denominator. |
| |
| Returns: |
| int | float: The result of a divided by b. |
| Returns None if b is zero. |
| """ |
| try: |
| return a / b |
| except ZeroDivisionError: |
| return None |
|
|
| @tool |
| def modulus_numbers(a: int|float, b:int|float)-> int|float: |
| """ |
| Compute the modulus of two numbers. |
| |
| Args: |
| a (int | float): The dividend. |
| b (int | float): The divisor. |
| |
| Returns: |
| int | float: The remainder after dividing a by b. |
| """ |
| return a % b |
|
|
| |
|
|
| @tool |
| def detect_objects(image_path: str) -> str: |
| """ |
| Detects objects in an image and returns a list with labels and confidence scores. |
| |
| Args: |
| image_path (str): Path to the input image file. |
| |
| Returns: |
| str: Detected objects with confidence scores. |
| """ |
| |
| object_detector = pipeline("object-detection", model="facebook/detr-resnet-50") |
| results = object_detector(image_path) |
| output = [] |
| for r in results: |
| label = r["label"] |
| score = round(r["score"], 3) |
| box = r["box"] |
| output.append(f"{label} (score={score}, box={box})") |
| return "\n".join(output) |
|
|
| |
| @tool |
| def run_python(code: str) -> str: |
| """ |
| Executes Python code safely and returns stdout or the last expression result. |
| |
| Args: |
| code (str): The Python code to execute. |
| |
| Returns: |
| str: Captured stdout and/or result. |
| """ |
| |
| stdout = io.StringIO() |
| local_vars = {} |
| |
| try: |
| |
| parsed = ast.parse(code, mode="exec") |
| last_expr = None |
| if parsed.body and isinstance(parsed.body[-1], ast.Expr): |
| |
| last_expr = parsed.body.pop() |
| |
| with contextlib.redirect_stdout(stdout): |
| |
| exec(compile(parsed, filename="<ast>", mode="exec"), {}, local_vars) |
| |
| |
| if last_expr is not None: |
| _result = eval(compile(ast.Expression(last_expr.value), |
| filename="<ast>", mode="eval"), {}, local_vars) |
| local_vars["_result"] = _result |
| |
| |
| if "_result" in local_vars: |
| return str(local_vars["_result"]) |
| |
| |
| return stdout.getvalue().strip() or "Code executed successfully." |
| |
| except Exception as e: |
| return f"Execution error: {e}" |