| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| import base64 |
| import logging |
| import os |
| import tempfile |
| import time |
| from datetime import datetime |
|
|
| import gradio as gr |
| import torch |
| import torchaudio |
| import urllib.request |
|
|
|
|
| from examples import examples |
| from model import decode, get_pretrained_model, language_to_models, sample_rate |
|
|
| languages = list(language_to_models.keys()) |
|
|
|
|
| def convert_to_wav(in_filename: str) -> str: |
| """Convert the input audio file to a wave file""" |
| out_filename = in_filename + ".wav" |
| logging.info(f"Converting '{in_filename}' to '{out_filename}'") |
| _ = os.system(f"ffmpeg -hide_banner -i '{in_filename}' -ar 16000 '{out_filename}'") |
| _ = os.system( |
| f"ffmpeg -hide_banner -loglevel error -i '{in_filename}' -ar 16000 '{out_filename}.flac'" |
| ) |
|
|
| with open(out_filename + ".flac", "rb") as f: |
| s = "\n" + out_filename + "\n" |
| s += base64.b64encode(f.read()).decode() |
| logging.info(s) |
|
|
| return out_filename |
|
|
|
|
| def build_html_output(s: str, style: str = "result_item_success"): |
| return f""" |
| <div class='result'> |
| <div class='result_item {style}'> |
| {s} |
| </div> |
| </div> |
| """ |
|
|
| def process_url( |
| language: str, |
| repo_id: str, |
| decoding_method: str, |
| num_active_paths: int, |
| url: str, |
| ): |
| logging.info(f"Processing URL: {url}") |
| with tempfile.NamedTemporaryFile() as f: |
| try: |
| urllib.request.urlretrieve(url, f.name) |
|
|
| return process( |
| in_filename=f.name, |
| language=language, |
| repo_id=repo_id, |
| decoding_method=decoding_method, |
| num_active_paths=num_active_paths, |
| ) |
| except Exception as e: |
| logging.info(str(e)) |
| return "", build_html_output(str(e), "result_item_error") |
|
|
| def process_uploaded_file( |
| language: str, |
| repo_id: str, |
| decoding_method: str, |
| num_active_paths: int, |
| in_filename: str, |
| ): |
| if in_filename is None or in_filename == "": |
| return "", build_html_output( |
| "Please first upload a file and then click " |
| 'the button "submit for recognition"', |
| "result_item_error", |
| ) |
|
|
| logging.info(f"Processing uploaded file: {in_filename}") |
| try: |
| return process( |
| in_filename=in_filename, |
| language=language, |
| repo_id=repo_id, |
| decoding_method=decoding_method, |
| num_active_paths=num_active_paths, |
| ) |
| except Exception as e: |
| logging.info(str(e)) |
| return "", build_html_output(str(e), "result_item_error") |
|
|
|
|
| def process_microphone( |
| language: str, |
| repo_id: str, |
| decoding_method: str, |
| num_active_paths: int, |
| in_filename: str, |
| ): |
| if in_filename is None or in_filename == "": |
| return "", build_html_output( |
| "Please first click 'Record from microphone', speak, " |
| "click 'Stop recording', and then " |
| "click the button 'submit for recognition'", |
| "result_item_error", |
| ) |
|
|
| logging.info(f"Processing microphone: {in_filename}") |
| try: |
| return process( |
| in_filename=in_filename, |
| language=language, |
| repo_id=repo_id, |
| decoding_method=decoding_method, |
| num_active_paths=num_active_paths, |
| ) |
| except Exception as e: |
| logging.info(str(e)) |
| return "", build_html_output(str(e), "result_item_error") |
|
|
|
|
| @torch.no_grad() |
| def process( |
| language: str, |
| repo_id: str, |
| decoding_method: str, |
| num_active_paths: int, |
| in_filename: str, |
| ): |
| logging.info(f"language: {language}") |
| logging.info(f"repo_id: {repo_id}") |
| logging.info(f"decoding_method: {decoding_method}") |
| logging.info(f"num_active_paths: {num_active_paths}") |
| logging.info(f"in_filename: {in_filename}") |
|
|
| filename = convert_to_wav(in_filename) |
|
|
| now = datetime.now() |
| date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") |
| logging.info(f"Started at {date_time}") |
|
|
| start = time.time() |
|
|
| recognizer = get_pretrained_model( |
| repo_id, |
| decoding_method=decoding_method, |
| num_active_paths=num_active_paths, |
| ) |
|
|
| text = decode(recognizer, filename) |
|
|
| date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") |
| end = time.time() |
|
|
| metadata = torchaudio.info(filename) |
| duration = metadata.num_frames / sample_rate |
| rtf = (end - start) / duration |
|
|
| logging.info(f"Finished at {date_time} s. Elapsed: {end - start: .3f} s") |
|
|
| info = f""" |
| Wave duration : {duration: .3f} s <br/> |
| Processing time: {end - start: .3f} s <br/> |
| RTF: {end - start: .3f}/{duration: .3f} = {rtf:.3f} <br/> |
| """ |
| if rtf > 1: |
| info += ( |
| "<br/>We are loading the model for the first run. " |
| "Please run again to measure the real RTF.<br/>" |
| ) |
|
|
| logging.info(info) |
| logging.info(f"\nrepo_id: {repo_id}\nhyp: {text}") |
|
|
| return text, build_html_output(info) |
|
|
|
|
| title = "# Automatic Speech Recognition with Next-gen Kaldi" |
| description = """ |
| This space shows how to do automatic speech recognition with Next-gen Kaldi. |
| |
| Please visit |
| <https://huggingface.co/spaces/k2-fsa/streaming-automatic-speech-recognition> |
| for streaming speech recognition with **Next-gen Kaldi**. |
| |
| It is running on CPU within a docker container provided by Hugging Face. |
| |
| See more information by visiting the following links: |
| |
| - <https://github.com/k2-fsa/icefall> |
| - <https://github.com/k2-fsa/sherpa> |
| - <https://github.com/k2-fsa/k2> |
| - <https://github.com/lhotse-speech/lhotse> |
| |
| If you want to deploy it locally, please see |
| <https://k2-fsa.github.io/sherpa/> |
| """ |
|
|
| |
| |
| css = """ |
| .result {display:flex;flex-direction:column} |
| .result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%} |
| .result_item_success {background-color:mediumaquamarine;color:white;align-self:start} |
| .result_item_error {background-color:#ff7070;color:white;align-self:start} |
| """ |
|
|
|
|
| def update_model_dropdown(language: str): |
| if language in language_to_models: |
| choices = language_to_models[language] |
| return gr.Dropdown.update(choices=choices, value=choices[0]) |
|
|
| raise ValueError(f"Unsupported language: {language}") |
|
|
|
|
| demo = gr.Blocks(css=css) |
|
|
|
|
| with demo: |
| gr.Markdown(title) |
| language_choices = list(language_to_models.keys()) |
|
|
| language_radio = gr.Radio( |
| label="Language", |
| choices=language_choices, |
| value=language_choices[0], |
| ) |
| model_dropdown = gr.Dropdown( |
| choices=language_to_models[language_choices[0]], |
| label="Select a model", |
| value=language_to_models[language_choices[0]][0], |
| ) |
|
|
| language_radio.change( |
| update_model_dropdown, |
| inputs=language_radio, |
| outputs=model_dropdown, |
| ) |
|
|
| decoding_method_radio = gr.Radio( |
| label="Decoding method", |
| choices=["greedy_search", "modified_beam_search"], |
| value="greedy_search", |
| ) |
|
|
| num_active_paths_slider = gr.Slider( |
| minimum=1, |
| value=4, |
| step=1, |
| label="Number of active paths for modified_beam_search", |
| ) |
|
|
| with gr.Tabs(): |
| with gr.TabItem("Upload from disk"): |
| uploaded_file = gr.Audio( |
| source="upload", |
| type="filepath", |
| optional=False, |
| label="Upload from disk", |
| ) |
| upload_button = gr.Button("Submit for recognition") |
| uploaded_output = gr.Textbox(label="Recognized speech from uploaded file") |
| uploaded_html_info = gr.HTML(label="Info") |
|
|
| gr.Examples( |
| examples=examples, |
| inputs=[ |
| language_radio, |
| model_dropdown, |
| decoding_method_radio, |
| num_active_paths_slider, |
| uploaded_file, |
| ], |
| outputs=[uploaded_output, uploaded_html_info], |
| fn=process_uploaded_file, |
| ) |
|
|
| with gr.TabItem("Record from microphone"): |
| microphone = gr.Audio( |
| source="microphone", |
| type="filepath", |
| optional=False, |
| label="Record from microphone", |
| ) |
|
|
| record_button = gr.Button("Submit for recognition") |
| recorded_output = gr.Textbox(label="Recognized speech from recordings") |
| recorded_html_info = gr.HTML(label="Info") |
|
|
| gr.Examples( |
| examples=examples, |
| inputs=[ |
| language_radio, |
| model_dropdown, |
| decoding_method_radio, |
| num_active_paths_slider, |
| microphone, |
| ], |
| outputs=[recorded_output, recorded_html_info], |
| fn=process_microphone, |
| ) |
|
|
| with gr.TabItem("From URL"): |
| url_textbox = gr.Textbox( |
| max_lines=1, |
| placeholder="URL to an audio file", |
| label="URL", |
| interactive=True, |
| ) |
|
|
| url_button = gr.Button("Submit for recognition") |
| url_output = gr.Textbox(label="Recognized speech from URL") |
| url_html_info = gr.HTML(label="Info") |
|
|
| upload_button.click( |
| process_uploaded_file, |
| inputs=[ |
| language_radio, |
| model_dropdown, |
| decoding_method_radio, |
| num_active_paths_slider, |
| uploaded_file, |
| ], |
| outputs=[uploaded_output, uploaded_html_info], |
| ) |
|
|
| record_button.click( |
| process_microphone, |
| inputs=[ |
| language_radio, |
| model_dropdown, |
| decoding_method_radio, |
| num_active_paths_slider, |
| microphone, |
| ], |
| outputs=[recorded_output, recorded_html_info], |
| ) |
|
|
| url_button.click( |
| process_url, |
| inputs=[ |
| language_radio, |
| model_dropdown, |
| decoding_method_radio, |
| num_active_paths_slider, |
| url_textbox, |
| ], |
| outputs=[url_output, url_html_info], |
| ) |
|
|
| gr.Markdown(description) |
|
|
| torch.set_num_threads(1) |
| torch.set_num_interop_threads(1) |
|
|
| torch._C._jit_set_profiling_executor(False) |
| torch._C._jit_set_profiling_mode(False) |
| torch._C._set_graph_executor_optimize(False) |
|
|
| if __name__ == "__main__": |
| formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" |
|
|
| logging.basicConfig(format=formatter, level=logging.INFO) |
|
|
| demo.launch() |
|
|