FD900's picture
Update tools.py
ec60378 verified
raw
history blame
4.22 kB
import tempfile
import requests
import os
from time import sleep
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
import imageio
from PIL import Image
from dotenv import load_dotenv
from smolagents import tool
import whisper
load_dotenv()
@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
"""
Use a Vision Model to answer a question about a set of images.
This stub exists for potential future Gemini or Mistral multimodal integration.
"""
return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis."
@tool
def review_youtube_video(url: str, question: str) -> str:
return "This tool is currently unsupported with Mistral. Please remove or replace."
@tool
def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
with tempfile.TemporaryDirectory() as tmpdir:
ydl_opts = {
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
'quiet': True,
'noplaylist': True,
'merge_output_format': 'mp4',
'force_ipv4': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None)
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data().get('fps')
frame_interval = int(fps * sample_interval_seconds)
images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0]
reader.close()
return images
@tool
def read_file(filepath: str) -> str:
try:
with open(filepath, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
return f"Error reading file: {str(e)}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
try:
if not filename:
filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}"
filepath = os.path.join(tempfile.gettempdir(), filename)
response = requests.get(url)
with open(filepath, 'wb') as f:
f.write(response.content)
return filepath
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
try:
import pytesseract
return pytesseract.image_to_string(Image.open(image_path))
except Exception as e:
return f"Error extracting text: {str(e)}"
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
try:
import pandas as pd
df = pd.read_csv(file_path)
return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}"
except Exception as e:
return f"CSV error: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
try:
import pandas as pd
df = pd.read_excel(file_path)
return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}"
except Exception as e:
return f"Excel error: {str(e)}"
@tool
def youtube_transcribe(url: str) -> str:
model = whisper.load_model("small")
with tempfile.TemporaryDirectory() as tmpdir:
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
'quiet': True,
'noplaylist': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'force_ipv4': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.extract_info(url, download=True)
audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
return whisper.load_model("small").transcribe(audio_path)['text']
@tool
def transcribe_audio(audio_file_path: str) -> str:
return whisper.load_model("small").transcribe(audio_file_path)['text']