File size: 4,219 Bytes
ec60378
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import tempfile
import requests
import os

from time import sleep
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
import imageio

from PIL import Image
from dotenv import load_dotenv
from smolagents import tool
import whisper

load_dotenv()

@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
    """
    Use a Vision Model to answer a question about a set of images. 
    This stub exists for potential future Gemini or Mistral multimodal integration.
    """
    return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis."

@tool
def review_youtube_video(url: str, question: str) -> str:
    return "This tool is currently unsupported with Mistral. Please remove or replace."

@tool
def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
            'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'merge_output_format': 'mp4',
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)

        video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None)
        reader = imageio.get_reader(video_path)
        fps = reader.get_meta_data().get('fps')
        frame_interval = int(fps * sample_interval_seconds)
        images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0]
        reader.close()
        return images

@tool
def read_file(filepath: str) -> str:
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            return file.read()
    except Exception as e:
        return f"Error reading file: {str(e)}"

@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
    try:
        if not filename:
            filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}"
        filepath = os.path.join(tempfile.gettempdir(), filename)
        response = requests.get(url)
        with open(filepath, 'wb') as f:
            f.write(response.content)
        return filepath
    except Exception as e:
        return f"Error downloading file: {str(e)}"

@tool
def extract_text_from_image(image_path: str) -> str:
    try:
        import pytesseract
        return pytesseract.image_to_string(Image.open(image_path))
    except Exception as e:
        return f"Error extracting text: {str(e)}"

@tool
def analyze_csv_file(file_path: str, query: str) -> str:
    try:
        import pandas as pd
        df = pd.read_csv(file_path)
        return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}"
    except Exception as e:
        return f"CSV error: {str(e)}"

@tool
def analyze_excel_file(file_path: str, query: str) -> str:
    try:
        import pandas as pd
        df = pd.read_excel(file_path)
        return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}"
    except Exception as e:
        return f"Excel error: {str(e)}"

@tool
def youtube_transcribe(url: str) -> str:
    model = whisper.load_model("small")
    with tempfile.TemporaryDirectory() as tmpdir:
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'wav',
            }],
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.extract_info(url, download=True)
        audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
        return whisper.load_model("small").transcribe(audio_path)['text']

@tool
def transcribe_audio(audio_file_path: str) -> str:
    return whisper.load_model("small").transcribe(audio_file_path)['text']