| |
| import requests |
| import time |
| from api_secrets import API_KEY_ASSEMBLYAI |
| import re |
| from fastapi import FastAPI |
| from pydantic import BaseModel |
| import asyncio |
| from typing import List, Union |
| import uvicorn |
| import json |
| import nltk |
| from nltk.corpus import stopwords |
| from nltk.stem import WordNetLemmatizer |
| import string |
|
|
| |
| |
| |
|
|
|
|
|
|
| app = FastAPI() |
|
|
| class Item(BaseModel): |
| url: str |
|
|
| upload_endpoint = 'https://api.assemblyai.com/v2/upload' |
| transcript_endpoint = 'https://api.assemblyai.com/v2/transcript' |
|
|
| headers_auth_only = {'authorization': API_KEY_ASSEMBLYAI} |
|
|
| headers = { |
| "authorization": API_KEY_ASSEMBLYAI, |
| "content-type": "application/json" |
| } |
|
|
| CHUNK_SIZE = 5_242_880 |
|
|
| def lemmatize_and_clean(text): |
| |
| words = nltk.word_tokenize(text) |
|
|
| |
| words = [word.lower() for word in words if word.isalpha()] |
|
|
| |
| stop_words = set(stopwords.words('english')) |
| words = [word for word in words if word not in stop_words] |
|
|
| |
| lemmatizer = WordNetLemmatizer() |
| words = [lemmatizer.lemmatize(word) for word in words] |
|
|
| |
| cleaned_text = ' '.join(words) |
|
|
| return cleaned_text |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| patterns = { |
| 'Unique Capsule': r'\b(?:uni(?:que)?|unit|uniq\.+|uni\.+)\s*capsul', |
| 'Refreshing Taste and Smell': r'\b(?:refreshing|ripe|repressing)\s+(?:taste\s+(?:smell|milk)|test\s+smell)\b', |
| 'Benson & Hadges Breeze': r'\b(?:benson\s+h(?:ess|aze|ezes|edge)\s+breez|banson\s+(?:haze\s+breez|hedge\s+(?:breez|bre))|benson\s+h(?:aze\s+brie|edge\s+bridge))\b', |
| } |
| |
| def nlp_bat(text): |
| results = {} |
| all_match = {} |
| for name, pattern in patterns.items(): |
| matches = re.findall(pattern, text, re.IGNORECASE) |
| m = {name:matches} |
| all_match.update(m) |
| count = len(matches) |
| results[name] = count |
| |
| |
| print(all_match) |
|
|
| return results |
|
|
|
|
|
|
|
|
|
|
|
|
| def upload(filename): |
| def read_file(filename): |
| with open(filename, 'rb') as f: |
| while True: |
| data = f.read(CHUNK_SIZE) |
| if not data: |
| break |
| yield data |
|
|
| upload_response = requests.post(upload_endpoint, headers=headers_auth_only, data=read_file(filename)) |
| return upload_response.json()['upload_url'] |
|
|
|
|
| def transcribe(audio_url): |
| transcript_request = { |
| 'audio_url': audio_url |
| } |
|
|
| transcript_response = requests.post(transcript_endpoint, json=transcript_request, headers=headers) |
| return transcript_response.json()['id'] |
|
|
| |
| def poll(transcript_id): |
| polling_endpoint = transcript_endpoint + '/' + transcript_id |
| polling_response = requests.get(polling_endpoint, headers=headers) |
| return polling_response.json() |
|
|
|
|
| def get_transcription_result_url(url): |
| transcribe_id = transcribe(url) |
| while True: |
| data = poll(transcribe_id) |
| if data['status'] == 'completed': |
| return data, None |
| elif data['status'] == 'error': |
| return data, data['error'] |
| |
| print("Processing Audio") |
| time.sleep(2) |
| |
| |
| def detect_audio(url, title): |
| data, error = get_transcription_result_url(url) |
| text_det = data['text'] |
| lmtz = lemmatize_and_clean(text_det) |
| print(lmtz) |
| txt = lmtz.lower() |
| r = nlp_bat(txt) |
| |
| |
| return r |
|
|
|
|
| async def process_item(item: Item): |
| try: |
| print(item.url) |
| result = detect_audio(item.url,title="file") |
| result = json.dumps(result) |
| res = json.loads(result) |
| return res |
| finally: |
| pass |
|
|
| async def process_items(items: Union[Item, List[Item]]): |
| if isinstance(items, list): |
| coroutines = [process_item(item) for item in items] |
| results_dict = await asyncio.gather(*coroutines) |
| results = {} |
| for item in results_dict: |
| results.update(item) |
| else: |
| results = await process_item(items) |
| return results |
| |
| @app.post("/nlp") |
| async def create_items(items: Union[Item, List[Item]]): |
| try: |
| results = await process_items(items) |
| print("Result Sent to User:", results) |
| return results |
| finally: |
| pass |
|
|
| if __name__ == "__main__": |
| try: |
| uvicorn.run(app, host="127.0.0.1", port=8020) |
| finally: |
| pass |
|
|