| |
| import ast |
| import json |
| import os |
| import base64 |
| from io import BytesIO |
| from time import sleep |
| from uuid import uuid4 |
|
|
| import boto3 |
| import fitz |
| import requests |
| from bs4 import BeautifulSoup |
| from googlesearch import search |
| from pandas import read_excel |
| from pytubefix import YouTube |
| from pytubefix.cli import on_progress |
| from smolagents import tool, Tool |
| from requests.exceptions import HTTPError |
| from urllib3.exceptions import ReadTimeoutError |
|
|
| from definitions import TranscriptionJob |
| from utils import get_file, s3_upload_file, s3_download_file, bedrock_runtime, BEDROCK_MODEL_ID |
|
|
|
|
| @tool |
| def math_calculator(expression: str) -> str: |
| """A simple calculator tool that evaluates mathematical expressions. |
| |
| Args: |
| expression (str): A mathematical expression as a string, e.g., "2 + 2 * 3". |
| """ |
| try: |
| result = ast.literal_eval(expression) |
| return str(result) |
| except Exception as e: |
| return f"Error evaluating expression: {e}" |
|
|
|
|
| @tool |
| def excel_reader(task_id: str, file_name: str) -> str: |
| """Reads an Excel file and returns its content as a dataframe string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the Excel file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| df = read_excel(file_content, engine="openpyxl") |
| return df.to_string(index=False) |
| except Exception as e: |
| return f"Error reading Excel file {file_name}: {e}" |
|
|
|
|
| @tool |
| def txt_reader(task_id: str, file_name: str) -> str: |
| """Reads a text file and returns its content as a string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| return file_content.read().decode("utf-8") |
| except Exception as e: |
| return f"Error reading file {file_name}: {e}" |
|
|
|
|
| @tool |
| def pdf_reader(task_id: str, file_name: str) -> str: |
| """Reads a PDF file and returns its content as a string. |
| |
| Args: |
| task_id (str): The ID of the task associated with the file. |
| file_name (str): The name of the PDF file to read. |
| """ |
| try: |
| file_content = get_file(task_id) |
| with fitz.open(stream=file_content.read(), filetype="pdf") as doc: |
| content = [page.get_text() for page in doc if page.get_text()] |
| text = "\n".join(content) |
| if not text: |
| return f"No text found in PDF file {file_name}." |
| return text.strip() |
| except Exception as e: |
| return f"Error reading PDF file {file_name}: {e}" |
|
|
|
|
| class AudioTranscriber: |
| """A class to handle audio transcription using AWS Transcribe.""" |
|
|
| def __init__(self): |
| region = os.getenv("AWS_REGION", "us-east-1") |
| self.client = boto3.client("transcribe", region_name=region) |
|
|
| def _transcribe_audio(self, job_name: str, media_uri: str) -> dict: |
| self.client.start_transcription_job( |
| TranscriptionJobName=job_name, |
| Media={"MediaFileUri": media_uri}, |
| IdentifyLanguage=True, |
| OutputBucketName=os.getenv("TARGET_BUCKET"), |
| OutputKey=f"{job_name}.json", |
| ) |
|
|
| def _get_transcription(self, job_name: str) -> str: |
| while True: |
| response: TranscriptionJob = self.client.get_transcription_job(TranscriptionJobName=job_name) |
| status = response["TranscriptionJob"]["TranscriptionJobStatus"] |
| if status in ["COMPLETED", "FAILED"]: |
| break |
| sleep(5) |
|
|
| transcript_url = response["TranscriptionJob"]["Transcript"]["TranscriptFileUri"] |
| try: |
| bytes_result = s3_download_file(os.getenv("TARGET_BUCKET"), transcript_url.split("/")[-1]) |
| transcription_data = json.loads(bytes_result.read().decode("utf-8")) |
| return transcription_data["results"]["transcripts"][0]["transcript"] |
| except json.JSONDecodeError as e: |
| print(f"Error decoding transcription JSON: {e}") |
| raise |
| except Exception as e: |
| print(f"Error downloading or processing transcription file: {e}") |
| raise |
|
|
|
|
| class AudioTranscriberTool(Tool, AudioTranscriber): |
| name = "AudioTranscriber" |
| description = "Extract text from audio files, such as MP3, MP4, WAV, etc." |
| inputs = { |
| "task_id": { |
| "type": "string", |
| "description": "The ID of the task associated with the audio file.", |
| }, |
| "file_name": { |
| "type": "string", |
| "description": "The name of the audio file to transcribe.", |
| }, |
| } |
| output_type = "string" |
|
|
| def __init__(self, *args, **kwargs): |
| Tool.__init__(self, *args, **kwargs) |
| AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
| def forward(self, task_id: str, file_name: str) -> str: |
| try: |
| file_content = get_file(task_id) |
| s3_upload_file(file_content, os.getenv("SOURCE_BUCKET"), file_name) |
|
|
| media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
| job_name = f"{uuid4()}-{file_name.split('.')[0]}" |
| self._transcribe_audio(job_name, media_uri) |
| transcription = self._get_transcription(job_name) |
| return transcription |
| except Exception as e: |
| return f"Error starting transcription job for {file_name}: {e}" |
|
|
|
|
| @tool |
| def image_transcriber(text_prompt: str, task_id: str, file_name: str) -> str: |
| """Transcribes text from an image file |
| |
| Args: |
| text_prompt (str): The text prompt to guide the transcription. |
| task_id (str): The ID of the task associated with the image file. |
| file_name (str): The name of the image file to transcribe. |
| """ |
| try: |
| file_content = get_file(task_id) |
| base64_image = base64.b64encode(file_content.getvalue()).decode("utf-8") |
| response = bedrock_runtime.invoke_model( |
| modelId=BEDROCK_MODEL_ID, |
| body=json.dumps( |
| { |
| "anthropic_version": "bedrock-2023-05-31", |
| "max_tokens": 4096, |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "image", |
| "source": { |
| "type": "base64", |
| "media_type": f"image/{file_name.split('.')[-1]}", |
| "data": base64_image, |
| }, |
| }, |
| {"type": "text", "text": text_prompt}, |
| ], |
| } |
| ], |
| } |
| ), |
| )["body"].read() |
| return json.loads(response)["content"][0]["text"] |
| except Exception as e: |
| return f"Error processing image file {file_name}: {e}" |
|
|
|
|
| def _get_content(url: str, timeout: int = 5) -> bytes: |
| resp = requests.get(url=url, timeout=timeout) |
| resp.raise_for_status() |
| return resp.content |
|
|
|
|
| def _js_disable_message(text: str) -> bool: |
| return "JavaScript is disabled in this browser" in text |
|
|
|
|
| @tool |
| def search_engine(search_term: str) -> str: |
| """Search for the provided search term in Google Search |
| |
| Args: |
| search_term (str): The term to search for on the web. |
| """ |
| results = search(search_term, num_results=5) |
| for idx, url in enumerate(results, 1): |
| error_ocurred = False |
| try: |
| html_content = BeautifulSoup(_get_content(url), "html.parser") |
| |
| for tag in html_content.find_all( |
| ["header", "footer", "nav", "aside", "script", "style", "noscript", "form"] |
| ): |
| tag.decompose() |
| except (ReadTimeoutError, HTTPError) as ex: |
| print("Got HTTP error when requesting %s. Error %s", url, ex) |
| error_ocurred = True |
|
|
| html_text = html_content.text |
| if _js_disable_message(html_text): |
| error_ocurred = True |
|
|
| if error_ocurred: |
| |
| if len(results) == idx: |
| return "Sorry, got an HTTP error when requesting the internet" |
| |
| continue |
|
|
| return html_text.replace("\n", "") |
|
|
| return "Could not retrieve any content from the search results." |
|
|
|
|
| class YoutubeTranscriberTool(Tool, AudioTranscriber): |
| name = "YoutubeTranscriber" |
| description = "Extract text from YouTube videos, do not work for video understanding." |
| inputs = { |
| "youtube_url": { |
| "type": "string", |
| "description": "The URL of the YouTube video to transcribe.", |
| }, |
| } |
| output_type = "string" |
|
|
| def __init__(self, *args, **kwargs): |
| Tool.__init__(self, *args, **kwargs) |
| AudioTranscriber.__init__(self, *args, **kwargs) |
|
|
| def forward(self, youtube_url: str) -> str: |
| file_name = f"{uuid4()}.mp4" |
| buffer = BytesIO() |
| try: |
| youtube_obj = YouTube(youtube_url, on_progress_callback=on_progress) |
| youtube_obj.streams.filter(progressive=True).first().stream_to_buffer(buffer) |
| except Exception as e: |
| return f"Error fetching YouTube video {youtube_url}: {e}" |
| try: |
| s3_upload_file(buffer, os.getenv("SOURCE_BUCKET"), file_name) |
| media_uri = f"s3://{os.getenv('SOURCE_BUCKET')}/{file_name}" |
| job_name = f"{uuid4()}-{file_name.split('.', maxsplit=1)[0]}" |
| self._transcribe_audio(job_name, media_uri) |
| transcription = self._get_transcription(job_name) |
| return transcription |
| except Exception as e: |
| return f"Error starting transcription job for {file_name}: {e}" |
|
|