Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import sys | |
| import requests | |
| import soundfile as sf | |
| import tempfile | |
| import re as regex | |
| import glob | |
| import random | |
| import difflib | |
| from spylls.hunspell import Dictionary | |
| # --- Fix: push recursion limit high enough for complex kabyle affixation rules --- | |
| # Catching RecursionError and continuing is DANGEROUS in Python (can segfault/hang). | |
| # Better to give the pure-Python spylls engine enough headroom to finish. | |
| sys.setrecursionlimit(10000) | |
| # --- Configuration --- | |
| MAX_SIZE_MB = "50" | |
| MAX_SECONDS = 60 | |
| LIBRE_API_KEY = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
| TRANSLATE_URL = "https://imsidag-community-libretranslate-kabyle.hf.space/translate" | |
| # --- Dataset Configuration --- | |
| DATASET_REPO = "boffire/kabyle-synth-voice" | |
| DATASET_AUDIO_BASE_URL = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/audio" | |
| DATASET_API_TREE_URL = f"https://huggingface.co/api/datasets/{DATASET_REPO}/tree/main/audio" | |
| # --- Hunspell Dictionary Configuration --- | |
| DICT_DIR = os.path.join(os.path.dirname(__file__), "dicts") | |
| DICT_BASE_PATH = os.path.join(DICT_DIR, "kab") | |
| PUNCTUATION_CHARS = '.,!?;:\"\'()[]{}\u00ab\u00bb\u2014\u2013-' | |
| _hunspell_dict = None | |
| _inference_fn = None | |
| def get_hunspell(): | |
| """Lazy-load (and cache) the Hunspell dictionary.""" | |
| global _hunspell_dict | |
| if _hunspell_dict is None: | |
| aff_path = DICT_BASE_PATH + ".aff" | |
| dic_path = DICT_BASE_PATH + ".dic" | |
| if not os.path.exists(aff_path) or not os.path.exists(dic_path): | |
| raise FileNotFoundError( | |
| f"Dictionnaire Hunspell kabyle non trouve.\n" | |
| f"Attendu: {aff_path} et {dic_path}\n" | |
| f"Veuillez uploader les fichiers kab.aff et kab.dic dans le dossier 'dicts/' de votre Space." | |
| ) | |
| _hunspell_dict = Dictionary.from_files(DICT_BASE_PATH) | |
| return _hunspell_dict | |
| # Pre-load dictionary at import time so the first request isn't delayed | |
| try: | |
| get_hunspell() | |
| except Exception as e: | |
| print(f"[WARN] Pre-load Hunspell dictionary failed: {e}") | |
| def get_inference_fn(): | |
| """Lazy-load the ASR inference function once.""" | |
| global _inference_fn | |
| if _inference_fn is None: | |
| from inference_file import inference | |
| _inference_fn = inference | |
| return _inference_fn | |
| def spellcheck_transcript(text: str, auto_correct: bool = True) -> tuple[str, list[dict]]: | |
| """ | |
| Verifie la transcription mot par mot avec Hunspell. | |
| Retourne: (texte_corrige, liste_des_corrections_appliquees) | |
| """ | |
| if not text or any(symbol in text for symbol in ["⚠️", "❌"]): | |
| return text, [] | |
| words = text.split() | |
| # Safety valve: skip spellcheck for very long transcriptions to avoid worker hangs | |
| if len(words) > 100: | |
| return text + "\n\n⚠️ Transcription trop longue — correction orthographique sautee (>100 mots).", [] | |
| # Load dictionary once for this call | |
| dic = get_hunspell() | |
| corrected_words = [] | |
| corrections = [] | |
| for i, word in enumerate(words): | |
| stripped = word.strip(PUNCTUATION_CHARS).lower() | |
| if not stripped: | |
| corrected_words.append(word) | |
| continue | |
| # Lookup | |
| try: | |
| is_valid = dic.lookup(stripped) | |
| except Exception: | |
| is_valid = False | |
| if is_valid: | |
| corrected_words.append(word) | |
| continue | |
| # Suggest | |
| try: | |
| suggestions = list(dic.suggest(stripped)) | |
| except Exception: | |
| suggestions = [] | |
| if not suggestions: | |
| corrected_words.append(word) | |
| continue | |
| best = max(suggestions, key=lambda s: difflib.SequenceMatcher(None, stripped, s).ratio()) | |
| similarity = difflib.SequenceMatcher(None, stripped, best).ratio() | |
| if similarity < 0.5: | |
| corrected_words.append(word) | |
| continue | |
| if word[0].isupper(): | |
| best = best[0].upper() + best[1:] | |
| prefix_len = len(word) - len(word.lstrip(PUNCTUATION_CHARS)) | |
| suffix_len = len(word) - len(word.rstrip(PUNCTUATION_CHARS)) | |
| prefix = word[:prefix_len] | |
| suffix = word[-suffix_len:] if suffix_len > 0 else "" | |
| corrected = prefix + best + suffix | |
| corrected_words.append(corrected) | |
| if corrected != word: | |
| corrections.append({ | |
| "position": i, | |
| "original": word, | |
| "suggestion": corrected | |
| }) | |
| return " ".join(corrected_words), corrections | |
| # --- Translation Logic --- | |
| def translate_to_english(text): | |
| if not text or any(symbol in text for symbol in ["⚠️", "❌"]): | |
| return "" | |
| clean_text = regex.sub(r"\s*\[\?\]", "", text) | |
| payload = { | |
| 'q': clean_text, | |
| 'source': 'kab', | |
| 'target': 'en', | |
| 'format': 'text', | |
| 'api_key': LIBRE_API_KEY | |
| } | |
| try: | |
| response = requests.post(TRANSLATE_URL, data=payload, timeout=15) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data.get("translatedText", "Translation error: No text returned.") | |
| except Exception as e: | |
| return f"❌ Translation Error: {str(e)}" | |
| # --- Dataset Logic --- | |
| _audio_files_cache = None | |
| def get_dataset_audio_files(): | |
| global _audio_files_cache | |
| if _audio_files_cache is not None: | |
| return _audio_files_cache | |
| try: | |
| resp = requests.get(DATASET_API_TREE_URL, timeout=15) | |
| resp.raise_for_status() | |
| items = resp.json() | |
| files = [ | |
| item["path"].replace("audio/", "") | |
| for item in items | |
| if item.get("type") == "file" and item["path"].endswith(".wav") | |
| ] | |
| _audio_files_cache = files | |
| return files | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to fetch dataset file list: {e}") | |
| def download_random_dataset_sample_validated(max_duration=MAX_SECONDS, max_retries=5): | |
| """ | |
| Pick a random audio file, download it, and validate duration. | |
| Retries if the file is too long. | |
| Returns (local_path, duration_seconds). | |
| """ | |
| files = get_dataset_audio_files() | |
| if not files: | |
| raise RuntimeError("No audio files found in the dataset.") | |
| for attempt in range(max_retries): | |
| filename = random.choice(files) | |
| file_url = f"{DATASET_AUDIO_BASE_URL}/{filename}" | |
| tmp_dir = tempfile.gettempdir() | |
| local_path = os.path.join(tmp_dir, f"dataset_{filename}") | |
| try: | |
| resp = requests.get(file_url, timeout=30, stream=True) | |
| resp.raise_for_status() | |
| with open(local_path, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| info = sf.info(local_path) | |
| if info.duration <= max_duration: | |
| return local_path, info.duration | |
| else: | |
| os.remove(local_path) | |
| if attempt == max_retries - 1: | |
| raise RuntimeError( | |
| f"Could not find a sample under {max_duration}s after {max_retries} tries." | |
| ) | |
| continue | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| raise RuntimeError(f"Failed to download valid sample: {e}") | |
| continue | |
| raise RuntimeError("Failed to get a valid dataset sample.") | |
| # --- Unified Processing Logic --- | |
| def format_transcript(text: str) -> str: | |
| if not text or any(symbol in text for symbol in ["⚠️", "❌"]): | |
| return text | |
| text = text.strip() | |
| if not text: | |
| return text | |
| text = text[0].upper() + text[1:] | |
| if text and text[-1] not in ".!?": | |
| text += "." | |
| return text | |
| def process_audio(audio_file, apply_spellcheck=True): | |
| """Handles validation -> Transcription -> Spellcheck -> Translation.""" | |
| if audio_file is None or (isinstance(audio_file, str) and audio_file.strip() == ""): | |
| return "Please upload an audio file first.", "", "" | |
| if isinstance(audio_file, str): | |
| try: | |
| info = sf.info(audio_file) | |
| if info.duration > MAX_SECONDS: | |
| return f"Audio too long ({info.duration:.1f}s). Max is {MAX_SECONDS}s.", "", "" | |
| except Exception as e: | |
| return f"Error reading audio info: {str(e)}", "", "" | |
| try: | |
| inference = get_inference_fn() | |
| transcript = inference(audio_file) | |
| transcript = format_transcript(transcript) | |
| spellchecked = transcript | |
| corrections = [] | |
| if apply_spellcheck: | |
| try: | |
| spellchecked, corrections = spellcheck_transcript(transcript, auto_correct=True) | |
| except FileNotFoundError: | |
| spellchecked = transcript + "\n\n⚠️ Dictionnaire Hunspell non trouve — correction orthographique desactivee." | |
| except Exception as e: | |
| spellchecked = transcript + f"\n\n❌ Erreur Hunspell: {str(e)}" | |
| translation = translate_to_english(spellchecked) | |
| return transcript, spellchecked, translation | |
| except Exception as e: | |
| return f"❌ Error during processing: {str(e)}", "", "" | |
| # --- Random tab: generator with safe staged updates --- | |
| def process_random_with_status(): | |
| """ | |
| Generator that yields progressive updates. | |
| If spellcheck is slow, the raw transcript is already visible so the UI never looks empty. | |
| """ | |
| try: | |
| # Stage 1: Fetch & validate | |
| yield "⏳ Fetching random sample...", None, "", "", "" | |
| try: | |
| audio_path, duration = download_random_dataset_sample_validated() | |
| except Exception as e: | |
| yield f"❌ Dataset Error: {str(e)}", None, "", "", "" | |
| return | |
| # Stage 2: Transcription | |
| yield f"⏳ Transcribing ({duration:.1f}s)...", audio_path, "Processing...", "", "" | |
| try: | |
| inference = get_inference_fn() | |
| transcript = inference(audio_path) | |
| transcript = format_transcript(transcript) | |
| except Exception as e: | |
| yield f"❌ Transcription Error: {str(e)}", audio_path, "", "", "" | |
| return | |
| # Stage 3: Spellcheck | |
| # Show the raw transcript immediately in the checked box so the user isn't staring at "Processing..." | |
| yield "⏳ Spellchecking (Hunspell)...", audio_path, transcript, transcript, "" | |
| try: | |
| spellchecked, corrections = spellcheck_transcript(transcript, auto_correct=True) | |
| except FileNotFoundError: | |
| spellchecked = transcript + "\n\n⚠️ Dictionnaire Hunspell non trouve — correction orthographique desactivee." | |
| except Exception as e: | |
| spellchecked = transcript + f"\n\n❌ Erreur Hunspell: {str(e)}" | |
| # Stage 4: Translation | |
| yield "⏳ Translating to English...", audio_path, transcript, spellchecked, "Processing..." | |
| translation = translate_to_english(spellchecked) | |
| # Stage 5: Done | |
| yield "✅ Done!", audio_path, transcript, spellchecked, translation | |
| except Exception as e: | |
| # Absolute last resort: if anything above crashes the generator, yield a clear error | |
| yield f"❌ Unexpected pipeline error: {str(e)}", None, "", "", "" | |
| # --- Build Gradio UI --- | |
| with gr.Blocks(title="🎙️ Mmeslay") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎙️ Mmeslay by [G1ya777](https://github.com/G1ya777/Mmeslay) | |
| ### Kabyle ASR, Spellcheck & Translation | |
| *Powered by Squeezeformer (ASR), Hunspell (Spellcheck) and LibreTranslate (NMT)* | |
| Upload a Kabyle audio file, record directly, **or pick a random sample** from the Kabyle Synth Voice dataset to get a transcript, a spellchecked version, and an English translation. | |
| """ | |
| ) | |
| with gr.Tab("🎧 Audio Upload / Record"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio( | |
| label="Input Audio", | |
| type="filepath", | |
| sources=["upload", "microphone"], | |
| format="mp3", | |
| ) | |
| apply_sc = gr.Checkbox( | |
| label="Activer la correction orthographique (Hunspell kabyle)", | |
| value=True, | |
| info="Corrige automatiquement les mots non reconnus par le dictionnaire kabyle" | |
| ) | |
| transcribe_btn = gr.Button("🚀 Transcribe, Spellcheck & Translate", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| text_output_raw = gr.Textbox( | |
| label="Transcription brute (ASR)", | |
| lines=3, | |
| info="Sortie directe du modele de reconnaissance vocale" | |
| ) | |
| text_output_checked = gr.Textbox( | |
| label="Transcription corrigee (Hunspell)", | |
| lines=3, | |
| info="Transcription apres correction orthographique automatique" | |
| ) | |
| translation_output_1 = gr.Textbox( | |
| label="LibreTranslate (English)", | |
| lines=3, | |
| placeholder="English translation will appear here..." | |
| ) | |
| transcribe_btn.click( | |
| fn=process_audio, | |
| inputs=[audio_input, apply_sc], | |
| outputs=[text_output_raw, text_output_checked, translation_output_1], | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| "ressources/examples/e1.mp3", | |
| "ressources/examples/e2.mp3", | |
| "ressources/examples/Ddahemmu.mp3" | |
| ], | |
| inputs=audio_input, | |
| ) | |
| with gr.Tab("Random Dataset Sample"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown( | |
| """ | |
| Click the button below to fetch a **random audio sample** from the [Kabyle Synth Voice](https://huggingface.co/datasets/boffire/kabyle-synth-voice) dataset. | |
| """ | |
| ) | |
| random_btn = gr.Button("Pick Random & Transcribe", variant="primary", size="lg") | |
| dataset_status = gr.Textbox(label="Status", interactive=False, value="Ready") | |
| with gr.Column(scale=2): | |
| random_audio_player = gr.Audio(label="🎵 Selected Sample", interactive=False, autoplay=False) | |
| with gr.Row(): | |
| text_output_3_raw = gr.Textbox(label="Transcription brute (Kabyle)", lines=3) | |
| text_output_3_checked = gr.Textbox(label="Transcription corrigee (Hunspell)", lines=3) | |
| translation_output_3 = gr.Textbox( | |
| label="LibreTranslate (English)", | |
| lines=3, | |
| placeholder="English LibreTranslate translation will appear here..." | |
| ) | |
| random_btn.click( | |
| fn=process_random_with_status, | |
| inputs=[], | |
| outputs=[dataset_status, random_audio_player, text_output_3_raw, text_output_3_checked, translation_output_3], | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| Developed by [G1ya777](https://github.com/G1ya777/Mmeslay). | |
| Examples from Tatoeba (CC BY licenses). | |
| Spellcheck powered by Hunspell kabyle dictionary. | |
| """ | |
| ) | |
| # --- Launch --- | |
| port = int(os.environ.get("PORT", 7860)) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| max_file_size=f"{MAX_SIZE_MB}mb", | |
| theme=gr.themes.Soft(), | |
| ) |