import gradio as gr import os import sys import requests import soundfile as sf import tempfile import re as regex import glob import random import difflib from spylls.hunspell import Dictionary # --- Fix: push recursion limit high enough for complex kabyle affixation rules --- # Catching RecursionError and continuing is DANGEROUS in Python (can segfault/hang). # Better to give the pure-Python spylls engine enough headroom to finish. sys.setrecursionlimit(10000) # --- Configuration --- MAX_SIZE_MB = "50" MAX_SECONDS = 60 LIBRE_API_KEY = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" TRANSLATE_URL = "https://imsidag-community-libretranslate-kabyle.hf.space/translate" # --- Dataset Configuration --- DATASET_REPO = "boffire/kabyle-synth-voice" DATASET_AUDIO_BASE_URL = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/audio" DATASET_API_TREE_URL = f"https://huggingface.co/api/datasets/{DATASET_REPO}/tree/main/audio" # --- Hunspell Dictionary Configuration --- DICT_DIR = os.path.join(os.path.dirname(__file__), "dicts") DICT_BASE_PATH = os.path.join(DICT_DIR, "kab") PUNCTUATION_CHARS = '.,!?;:\"\'()[]{}\u00ab\u00bb\u2014\u2013-' _hunspell_dict = None _inference_fn = None def get_hunspell(): """Lazy-load (and cache) the Hunspell dictionary.""" global _hunspell_dict if _hunspell_dict is None: aff_path = DICT_BASE_PATH + ".aff" dic_path = DICT_BASE_PATH + ".dic" if not os.path.exists(aff_path) or not os.path.exists(dic_path): raise FileNotFoundError( f"Dictionnaire Hunspell kabyle non trouve.\n" f"Attendu: {aff_path} et {dic_path}\n" f"Veuillez uploader les fichiers kab.aff et kab.dic dans le dossier 'dicts/' de votre Space." ) _hunspell_dict = Dictionary.from_files(DICT_BASE_PATH) return _hunspell_dict # Pre-load dictionary at import time so the first request isn't delayed try: get_hunspell() except Exception as e: print(f"[WARN] Pre-load Hunspell dictionary failed: {e}") def get_inference_fn(): """Lazy-load the ASR inference function once.""" global _inference_fn if _inference_fn is None: from inference_file import inference _inference_fn = inference return _inference_fn def spellcheck_transcript(text: str, auto_correct: bool = True) -> tuple[str, list[dict]]: """ Verifie la transcription mot par mot avec Hunspell. Retourne: (texte_corrige, liste_des_corrections_appliquees) """ if not text or any(symbol in text for symbol in ["⚠️", "❌"]): return text, [] words = text.split() # Safety valve: skip spellcheck for very long transcriptions to avoid worker hangs if len(words) > 100: return text + "\n\n⚠️ Transcription trop longue — correction orthographique sautee (>100 mots).", [] # Load dictionary once for this call dic = get_hunspell() corrected_words = [] corrections = [] for i, word in enumerate(words): stripped = word.strip(PUNCTUATION_CHARS).lower() if not stripped: corrected_words.append(word) continue # Lookup try: is_valid = dic.lookup(stripped) except Exception: is_valid = False if is_valid: corrected_words.append(word) continue # Suggest try: suggestions = list(dic.suggest(stripped)) except Exception: suggestions = [] if not suggestions: corrected_words.append(word) continue best = max(suggestions, key=lambda s: difflib.SequenceMatcher(None, stripped, s).ratio()) similarity = difflib.SequenceMatcher(None, stripped, best).ratio() if similarity < 0.5: corrected_words.append(word) continue if word[0].isupper(): best = best[0].upper() + best[1:] prefix_len = len(word) - len(word.lstrip(PUNCTUATION_CHARS)) suffix_len = len(word) - len(word.rstrip(PUNCTUATION_CHARS)) prefix = word[:prefix_len] suffix = word[-suffix_len:] if suffix_len > 0 else "" corrected = prefix + best + suffix corrected_words.append(corrected) if corrected != word: corrections.append({ "position": i, "original": word, "suggestion": corrected }) return " ".join(corrected_words), corrections # --- Translation Logic --- def translate_to_english(text): if not text or any(symbol in text for symbol in ["⚠️", "❌"]): return "" clean_text = regex.sub(r"\s*\[\?\]", "", text) payload = { 'q': clean_text, 'source': 'kab', 'target': 'en', 'format': 'text', 'api_key': LIBRE_API_KEY } try: response = requests.post(TRANSLATE_URL, data=payload, timeout=15) response.raise_for_status() data = response.json() return data.get("translatedText", "Translation error: No text returned.") except Exception as e: return f"❌ Translation Error: {str(e)}" # --- Dataset Logic --- _audio_files_cache = None def get_dataset_audio_files(): global _audio_files_cache if _audio_files_cache is not None: return _audio_files_cache try: resp = requests.get(DATASET_API_TREE_URL, timeout=15) resp.raise_for_status() items = resp.json() files = [ item["path"].replace("audio/", "") for item in items if item.get("type") == "file" and item["path"].endswith(".wav") ] _audio_files_cache = files return files except Exception as e: raise RuntimeError(f"Failed to fetch dataset file list: {e}") def download_random_dataset_sample_validated(max_duration=MAX_SECONDS, max_retries=5): """ Pick a random audio file, download it, and validate duration. Retries if the file is too long. Returns (local_path, duration_seconds). """ files = get_dataset_audio_files() if not files: raise RuntimeError("No audio files found in the dataset.") for attempt in range(max_retries): filename = random.choice(files) file_url = f"{DATASET_AUDIO_BASE_URL}/{filename}" tmp_dir = tempfile.gettempdir() local_path = os.path.join(tmp_dir, f"dataset_{filename}") try: resp = requests.get(file_url, timeout=30, stream=True) resp.raise_for_status() with open(local_path, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) info = sf.info(local_path) if info.duration <= max_duration: return local_path, info.duration else: os.remove(local_path) if attempt == max_retries - 1: raise RuntimeError( f"Could not find a sample under {max_duration}s after {max_retries} tries." ) continue except Exception as e: if attempt == max_retries - 1: raise RuntimeError(f"Failed to download valid sample: {e}") continue raise RuntimeError("Failed to get a valid dataset sample.") # --- Unified Processing Logic --- def format_transcript(text: str) -> str: if not text or any(symbol in text for symbol in ["⚠️", "❌"]): return text text = text.strip() if not text: return text text = text[0].upper() + text[1:] if text and text[-1] not in ".!?": text += "." return text def process_audio(audio_file, apply_spellcheck=True): """Handles validation -> Transcription -> Spellcheck -> Translation.""" if audio_file is None or (isinstance(audio_file, str) and audio_file.strip() == ""): return "Please upload an audio file first.", "", "" if isinstance(audio_file, str): try: info = sf.info(audio_file) if info.duration > MAX_SECONDS: return f"Audio too long ({info.duration:.1f}s). Max is {MAX_SECONDS}s.", "", "" except Exception as e: return f"Error reading audio info: {str(e)}", "", "" try: inference = get_inference_fn() transcript = inference(audio_file) transcript = format_transcript(transcript) spellchecked = transcript corrections = [] if apply_spellcheck: try: spellchecked, corrections = spellcheck_transcript(transcript, auto_correct=True) except FileNotFoundError: spellchecked = transcript + "\n\n⚠️ Dictionnaire Hunspell non trouve — correction orthographique desactivee." except Exception as e: spellchecked = transcript + f"\n\n❌ Erreur Hunspell: {str(e)}" translation = translate_to_english(spellchecked) return transcript, spellchecked, translation except Exception as e: return f"❌ Error during processing: {str(e)}", "", "" # --- Random tab: generator with safe staged updates --- def process_random_with_status(): """ Generator that yields progressive updates. If spellcheck is slow, the raw transcript is already visible so the UI never looks empty. """ try: # Stage 1: Fetch & validate yield "⏳ Fetching random sample...", None, "", "", "" try: audio_path, duration = download_random_dataset_sample_validated() except Exception as e: yield f"❌ Dataset Error: {str(e)}", None, "", "", "" return # Stage 2: Transcription yield f"⏳ Transcribing ({duration:.1f}s)...", audio_path, "Processing...", "", "" try: inference = get_inference_fn() transcript = inference(audio_path) transcript = format_transcript(transcript) except Exception as e: yield f"❌ Transcription Error: {str(e)}", audio_path, "", "", "" return # Stage 3: Spellcheck # Show the raw transcript immediately in the checked box so the user isn't staring at "Processing..." yield "⏳ Spellchecking (Hunspell)...", audio_path, transcript, transcript, "" try: spellchecked, corrections = spellcheck_transcript(transcript, auto_correct=True) except FileNotFoundError: spellchecked = transcript + "\n\n⚠️ Dictionnaire Hunspell non trouve — correction orthographique desactivee." except Exception as e: spellchecked = transcript + f"\n\n❌ Erreur Hunspell: {str(e)}" # Stage 4: Translation yield "⏳ Translating to English...", audio_path, transcript, spellchecked, "Processing..." translation = translate_to_english(spellchecked) # Stage 5: Done yield "✅ Done!", audio_path, transcript, spellchecked, translation except Exception as e: # Absolute last resort: if anything above crashes the generator, yield a clear error yield f"❌ Unexpected pipeline error: {str(e)}", None, "", "", "" # --- Build Gradio UI --- with gr.Blocks(title="🎙️ Mmeslay") as demo: gr.Markdown( """ # 🎙️ Mmeslay by [G1ya777](https://github.com/G1ya777/Mmeslay) ### Kabyle ASR, Spellcheck & Translation *Powered by Squeezeformer (ASR), Hunspell (Spellcheck) and LibreTranslate (NMT)* Upload a Kabyle audio file, record directly, **or pick a random sample** from the Kabyle Synth Voice dataset to get a transcript, a spellchecked version, and an English translation. """ ) with gr.Tab("🎧 Audio Upload / Record"): with gr.Row(): with gr.Column(scale=1): audio_input = gr.Audio( label="Input Audio", type="filepath", sources=["upload", "microphone"], format="mp3", ) apply_sc = gr.Checkbox( label="Activer la correction orthographique (Hunspell kabyle)", value=True, info="Corrige automatiquement les mots non reconnus par le dictionnaire kabyle" ) transcribe_btn = gr.Button("🚀 Transcribe, Spellcheck & Translate", variant="primary", size="lg") with gr.Column(scale=2): with gr.Row(): text_output_raw = gr.Textbox( label="Transcription brute (ASR)", lines=3, info="Sortie directe du modele de reconnaissance vocale" ) text_output_checked = gr.Textbox( label="Transcription corrigee (Hunspell)", lines=3, info="Transcription apres correction orthographique automatique" ) translation_output_1 = gr.Textbox( label="LibreTranslate (English)", lines=3, placeholder="English translation will appear here..." ) transcribe_btn.click( fn=process_audio, inputs=[audio_input, apply_sc], outputs=[text_output_raw, text_output_checked, translation_output_1], ) gr.Examples( examples=[ "ressources/examples/e1.mp3", "ressources/examples/e2.mp3", "ressources/examples/Ddahemmu.mp3" ], inputs=audio_input, ) with gr.Tab("Random Dataset Sample"): with gr.Row(): with gr.Column(scale=1): gr.Markdown( """ Click the button below to fetch a **random audio sample** from the [Kabyle Synth Voice](https://huggingface.co/datasets/boffire/kabyle-synth-voice) dataset. """ ) random_btn = gr.Button("Pick Random & Transcribe", variant="primary", size="lg") dataset_status = gr.Textbox(label="Status", interactive=False, value="Ready") with gr.Column(scale=2): random_audio_player = gr.Audio(label="🎵 Selected Sample", interactive=False, autoplay=False) with gr.Row(): text_output_3_raw = gr.Textbox(label="Transcription brute (Kabyle)", lines=3) text_output_3_checked = gr.Textbox(label="Transcription corrigee (Hunspell)", lines=3) translation_output_3 = gr.Textbox( label="LibreTranslate (English)", lines=3, placeholder="English LibreTranslate translation will appear here..." ) random_btn.click( fn=process_random_with_status, inputs=[], outputs=[dataset_status, random_audio_player, text_output_3_raw, text_output_3_checked, translation_output_3], ) gr.Markdown( """ --- Developed by [G1ya777](https://github.com/G1ya777/Mmeslay). Examples from Tatoeba (CC BY licenses). Spellcheck powered by Hunspell kabyle dictionary. """ ) # --- Launch --- port = int(os.environ.get("PORT", 7860)) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=port, max_file_size=f"{MAX_SIZE_MB}mb", theme=gr.themes.Soft(), )