import tempfile import requests import os from time import sleep from urllib.parse import urlparse from typing import Optional, List import yt_dlp import imageio from PIL import Image from dotenv import load_dotenv from gaia_benchmark.tools import tool import whisper load_dotenv() @tool def use_vision_model(question: str, images: List[Image.Image]) -> str: """ Use a Vision Model to answer a question about a set of images. This stub exists for potential future Gemini or Mistral multimodal integration. """ return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis." @tool def review_youtube_video(url: str, question: str) -> str: return "This tool is currently unsupported with Mistral. Please remove or replace." @tool def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]: with tempfile.TemporaryDirectory() as tmpdir: ydl_opts = { 'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best', 'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'), 'quiet': True, 'noplaylist': True, 'merge_output_format': 'mp4', 'force_ipv4': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None) reader = imageio.get_reader(video_path) fps = reader.get_meta_data().get('fps') frame_interval = int(fps * sample_interval_seconds) images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0] reader.close() return images @tool def read_file(filepath: str) -> str: try: with open(filepath, 'r', encoding='utf-8') as file: return file.read() except Exception as e: return f"Error reading file: {str(e)}" @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: try: if not filename: filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}" filepath = os.path.join(tempfile.gettempdir(), filename) response = requests.get(url) with open(filepath, 'wb') as f: f.write(response.content) return filepath except Exception as e: return f"Error downloading file: {str(e)}" @tool def extract_text_from_image(image_path: str) -> str: try: import pytesseract return pytesseract.image_to_string(Image.open(image_path)) except Exception as e: return f"Error extracting text: {str(e)}" @tool def analyze_csv_file(file_path: str, query: str) -> str: try: import pandas as pd df = pd.read_csv(file_path) return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}" except Exception as e: return f"CSV error: {str(e)}" @tool def analyze_excel_file(file_path: str, query: str) -> str: try: import pandas as pd df = pd.read_excel(file_path) return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}" except Exception as e: return f"Excel error: {str(e)}" @tool def youtube_transcribe(url: str) -> str: model = whisper.load_model("small") with tempfile.TemporaryDirectory() as tmpdir: ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'), 'quiet': True, 'noplaylist': True, 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], 'force_ipv4': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.extract_info(url, download=True) audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None) return whisper.load_model("small").transcribe(audio_path)['text'] @tool def transcribe_audio(audio_file_path: str) -> str: return whisper.load_model("small").transcribe(audio_file_path)['text']