| """ |
| Live Football Commentary Pipeline — Real-Time Streaming |
| ======================================================== |
| English → Yoruba with ~3-5 second latency. |
| |
| Uses Gradio's streaming audio API to continuously capture mic input, |
| process chunks through ASR → MT → TTS, and play back Yoruba audio. |
| """ |
|
|
| import torch |
| import numpy as np |
| import re |
| import time |
| import io |
| import os |
| import subprocess |
| import tempfile |
| import logging |
| import soundfile as sf |
| import gradio as gr |
| from transformers import ( |
| pipeline as hf_pipeline, |
| AutoTokenizer, |
| AutoModelForSeq2SeqLM, |
| ) |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| ASR_MODEL_ID = "PlotweaverAI/whisper-small-de-en" |
| MT_MODEL_ID = "PlotweaverAI/nllb-200-distilled-600M-african-6lang" |
| TTS_MODEL_ID = "PlotweaverAI/yoruba-mms-tts-new" |
|
|
| MT_SRC_LANG = "eng_Latn" |
| MT_TGT_LANG = "yor_Latn" |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
| |
| CHUNK_DURATION_S = 5 |
| TARGET_SR = 16000 |
|
|
|
|
| |
| |
| |
|
|
| print(f"Device: {DEVICE} | Dtype: {TORCH_DTYPE}") |
| print("Loading models...") |
|
|
| print(f" Loading ASR: {ASR_MODEL_ID}") |
| asr_pipe = hf_pipeline( |
| "automatic-speech-recognition", |
| model=ASR_MODEL_ID, |
| device=DEVICE, |
| torch_dtype=TORCH_DTYPE, |
| ) |
| print(" ASR loaded") |
|
|
| print(f" Loading MT: {MT_MODEL_ID}") |
| mt_tokenizer = AutoTokenizer.from_pretrained(MT_MODEL_ID) |
| mt_model = AutoModelForSeq2SeqLM.from_pretrained( |
| MT_MODEL_ID, torch_dtype=TORCH_DTYPE |
| ).to(DEVICE) |
| mt_tokenizer.src_lang = MT_SRC_LANG |
| tgt_lang_id = mt_tokenizer.convert_tokens_to_ids(MT_TGT_LANG) |
| print(f" MT loaded (target token id: {tgt_lang_id})") |
|
|
| print(f" Loading TTS: {TTS_MODEL_ID}") |
| tts_pipe = hf_pipeline( |
| "text-to-speech", |
| model=TTS_MODEL_ID, |
| device=DEVICE, |
| torch_dtype=TORCH_DTYPE, |
| ) |
| print(" TTS loaded") |
| print("All models loaded!") |
|
|
| |
| print(f"\n=== Device diagnostics ===") |
| print(f"CUDA available: {torch.cuda.is_available()}") |
| if torch.cuda.is_available(): |
| print(f"CUDA device: {torch.cuda.get_device_name(0)}") |
| print(f"CUDA memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") |
| try: |
| asr_device = next(asr_pipe.model.parameters()).device |
| print(f"ASR model on: {asr_device}") |
| except Exception as e: |
| print(f"ASR device check failed: {e}") |
| try: |
| mt_device = next(mt_model.parameters()).device |
| print(f"MT model on: {mt_device}") |
| except Exception as e: |
| print(f"MT device check failed: {e}") |
| try: |
| tts_device = next(tts_pipe.model.parameters()).device |
| print(f"TTS model on: {tts_device}") |
| except Exception as e: |
| print(f"TTS device check failed: {e}") |
| print(f"==========================\n") |
|
|
|
|
| |
| |
| |
|
|
| def split_into_sentences(text): |
| """Split raw ASR text into individual sentences.""" |
| text = text.strip() |
| if not text: |
| return [] |
| text = '. '.join(s.strip().capitalize() for s in text.split('. ') if s.strip()) |
| if re.search(r'[.!?]', text): |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| return [s.strip() for s in sentences if s.strip()] |
| words = text.split() |
| MAX_WORDS = 12 |
| sentences = [] |
| for i in range(0, len(words), MAX_WORDS): |
| chunk = ' '.join(words[i:i + MAX_WORDS]) |
| if not chunk.endswith(('.', '!', '?')): |
| chunk += '.' |
| chunk = chunk[0].upper() + chunk[1:] if len(chunk) > 1 else chunk.upper() |
| sentences.append(chunk) |
| return sentences |
|
|
|
|
| def transcribe(audio_array, sample_rate=16000): |
| """ASR: English audio to text. |
| |
| For short audio (<28s): uses HF pipeline (fast, single-pass). |
| For long audio: uses native Whisper generate() with long-form support, |
| which is dramatically faster than the pipeline's chunking mode. |
| """ |
| if len(audio_array) < 1600: |
| return "" |
| |
| duration_s = len(audio_array) / sample_rate |
| |
| |
| if sample_rate != 16000: |
| import torchaudio.functional as F_audio |
| audio_tensor = torch.from_numpy(audio_array).float() |
| audio_tensor = F_audio.resample(audio_tensor, sample_rate, 16000) |
| audio_array = audio_tensor.numpy() |
| sample_rate = 16000 |
| |
| if duration_s <= 28: |
| |
| result = asr_pipe( |
| {"raw": audio_array, "sampling_rate": sample_rate}, |
| return_timestamps=False, |
| ) |
| return result["text"].strip() |
| |
| |
| |
| model = asr_pipe.model |
| processor = asr_pipe.feature_extractor |
| tokenizer = asr_pipe.tokenizer |
| |
| |
| inputs = processor( |
| audio_array, |
| sampling_rate=16000, |
| return_tensors="pt", |
| truncation=False, |
| padding="longest", |
| return_attention_mask=True, |
| ) |
| input_features = inputs.input_features.to(DEVICE, dtype=TORCH_DTYPE) |
| attention_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None |
| |
| generate_kwargs = { |
| "return_timestamps": True, |
| "language": "en", |
| "task": "transcribe", |
| } |
| if attention_mask is not None: |
| generate_kwargs["attention_mask"] = attention_mask |
| |
| with torch.no_grad(): |
| predicted_ids = model.generate(input_features, **generate_kwargs) |
| |
| transcription = tokenizer.batch_decode( |
| predicted_ids, skip_special_tokens=True |
| )[0] |
| return transcription.strip() |
|
|
|
|
| def translate_sentence(text, max_length=256, fast=False): |
| """MT: Single sentence English to Yoruba. |
| |
| fast=True uses greedy decoding (3-4x faster) for streaming mode. |
| fast=False uses beam search for better quality in batch mode. |
| """ |
| inputs = mt_tokenizer(text, return_tensors="pt", truncation=True).to(DEVICE) |
| with torch.no_grad(): |
| if fast: |
| |
| |
| output_ids = mt_model.generate( |
| **inputs, |
| max_length=128, |
| forced_bos_token_id=tgt_lang_id, |
| repetition_penalty=1.5, |
| no_repeat_ngram_size=3, |
| num_beams=1, |
| do_sample=False, |
| ) |
| else: |
| |
| output_ids = mt_model.generate( |
| **inputs, |
| max_length=max_length, |
| forced_bos_token_id=tgt_lang_id, |
| repetition_penalty=1.5, |
| no_repeat_ngram_size=3, |
| num_beams=4, |
| early_stopping=True, |
| ) |
| return mt_tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
|
| def translate_text(text, fast=False): |
| """Split and translate sentence by sentence.""" |
| sentences = split_into_sentences(text) |
| if not sentences: |
| return "" |
| translations = [translate_sentence(s, fast=fast) for s in sentences] |
| return ' '.join(translations) |
|
|
|
|
| def synthesize(text): |
| """TTS: Yoruba text to audio.""" |
| if not text.strip(): |
| return np.array([], dtype=np.float32), TARGET_SR |
| result = tts_pipe(text) |
| audio = np.array(result["audio"]).squeeze() |
| sr = result["sampling_rate"] |
| return audio, sr |
|
|
|
|
| def process_chunk(audio_array, sample_rate): |
| """Full pipeline on a single audio chunk.""" |
| t_start = time.time() |
|
|
| |
| english = transcribe(audio_array, sample_rate) |
| if not english: |
| return None, None, "", "", 0 |
|
|
| |
| yoruba = translate_text(english, fast=True) |
| if not yoruba: |
| return None, None, english, "", 0 |
|
|
| |
| audio_out, sr_out = synthesize(yoruba) |
| if len(audio_out) == 0: |
| return None, None, english, yoruba, 0 |
|
|
| elapsed = time.time() - t_start |
| logger.info(f"Chunk processed in {elapsed:.2f}s: EN='{english[:60]}' -> YO='{yoruba[:60]}'") |
|
|
| return audio_out, sr_out, english, yoruba, elapsed |
|
|
|
|
| |
| |
| |
|
|
| class StreamState: |
| """Manages the audio buffer for streaming mode.""" |
|
|
| def __init__(self, chunk_duration_s=CHUNK_DURATION_S): |
| self.chunk_duration_s = chunk_duration_s |
| self.audio_buffer = np.array([], dtype=np.float32) |
| self.buffer_sr = TARGET_SR |
| self.transcript_en = [] |
| self.transcript_yo = [] |
| self.chunk_count = 0 |
| self.total_time = 0.0 |
|
|
| def reset(self): |
| self.audio_buffer = np.array([], dtype=np.float32) |
| self.transcript_en = [] |
| self.transcript_yo = [] |
| self.chunk_count = 0 |
| self.total_time = 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def process_audio_upload(audio_input): |
| """Batch mode: upload/record full audio, get translation back.""" |
| if audio_input is None: |
| return None, "Please upload or record audio." |
|
|
| sample_rate, audio_array = audio_input |
| audio_array = audio_array.astype(np.float32) |
| if audio_array.ndim > 1: |
| audio_array = audio_array.mean(axis=1) |
| if audio_array.max() > 1.0 or audio_array.min() < -1.0: |
| audio_array = audio_array / max(abs(audio_array.max()), abs(audio_array.min())) |
|
|
| total_start = time.time() |
| log = [] |
|
|
| |
| t0 = time.time() |
| english = transcribe(audio_array, sample_rate) |
| log.append(f"**ASR** ({time.time()-t0:.2f}s)\n{english}") |
|
|
| if not english: |
| return None, "ASR returned empty text. Try clearer audio." |
|
|
| |
| t0 = time.time() |
| sentences = split_into_sentences(english) |
| translations = [] |
| for s in sentences: |
| yo = translate_sentence(s) |
| translations.append(yo) |
| log.append(f" EN: {s}\n YO: {yo}") |
| yoruba = ' '.join(translations) |
| log.append(f"**MT** ({time.time()-t0:.2f}s)") |
|
|
| if not yoruba: |
| return None, "Translation returned empty." |
|
|
| |
| t0 = time.time() |
| audio_out, sr_out = synthesize(yoruba) |
| log.append(f"**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") |
| log.append(f"\n**Total: {time.time()-total_start:.2f}s**") |
|
|
| return (sr_out, audio_out), "\n".join(log) |
|
|
|
|
| def process_text_input(text): |
| """Text mode: type English, get Yoruba audio.""" |
| if not text or not text.strip(): |
| return None, "Please enter some English text." |
|
|
| t_total = time.time() |
| log = [] |
|
|
| |
| t0 = time.time() |
| sentences = split_into_sentences(text.strip()) |
| translations = [] |
| for s in sentences: |
| yo = translate_sentence(s) |
| translations.append(yo) |
| log.append(f"EN: {s}\nYO: {yo}\n") |
| yoruba = ' '.join(translations) |
| log.append(f"**MT** ({time.time()-t0:.2f}s)") |
|
|
| |
| t0 = time.time() |
| audio_out, sr_out = synthesize(yoruba) |
| log.append(f"**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") |
| log.append(f"\n**Total: {time.time()-t_total:.2f}s**") |
|
|
| return (sr_out, audio_out), "\n".join(log) |
|
|
|
|
| def streaming_process(audio_input, state): |
| """ |
| Streaming mode: receives audio chunks from the microphone, |
| buffers them, and processes when enough has accumulated. |
| |
| This function is called repeatedly by Gradio's streaming API |
| each time a new audio chunk arrives from the mic. |
| """ |
| if state is None: |
| state = StreamState() |
|
|
| if audio_input is None: |
| return None, format_live_log(state), state |
|
|
| sample_rate, audio_chunk = audio_input |
| audio_chunk = audio_chunk.astype(np.float32) |
| if audio_chunk.ndim > 1: |
| audio_chunk = audio_chunk.mean(axis=1) |
| if audio_chunk.max() > 1.0 or audio_chunk.min() < -1.0: |
| max_val = max(abs(audio_chunk.max()), abs(audio_chunk.min())) |
| if max_val > 0: |
| audio_chunk = audio_chunk / max_val |
|
|
| |
| state.buffer_sr = sample_rate |
| state.audio_buffer = np.concatenate([state.audio_buffer, audio_chunk]) |
|
|
| required_samples = int(state.chunk_duration_s * sample_rate) |
|
|
| |
| if len(state.audio_buffer) < required_samples: |
| buffered_s = len(state.audio_buffer) / sample_rate |
| return None, format_live_log(state, buffered_s), state |
|
|
| |
| chunk = state.audio_buffer[:required_samples] |
| state.audio_buffer = state.audio_buffer[required_samples:] |
|
|
| audio_out, sr_out, english, yoruba, elapsed = process_chunk(chunk, sample_rate) |
|
|
| if english: |
| state.chunk_count += 1 |
| state.total_time += elapsed |
| state.transcript_en.append(english) |
| state.transcript_yo.append(yoruba) |
|
|
| if audio_out is not None and len(audio_out) > 0: |
| |
| audio_out = np.clip(audio_out, -1.0, 1.0).astype(np.float32) |
| return (sr_out, audio_out), format_live_log(state), state |
| else: |
| return None, format_live_log(state), state |
|
|
|
|
| def format_live_log(state, buffered_s=None): |
| """Format the live transcript log.""" |
| lines = [f"**Chunks processed:** {state.chunk_count}"] |
| if state.chunk_count > 0: |
| avg = state.total_time / state.chunk_count |
| lines.append(f"**Avg processing time:** {avg:.2f}s per chunk") |
| if buffered_s is not None: |
| lines.append(f"**Buffering:** {buffered_s:.1f}s / {CHUNK_DURATION_S}s") |
| lines.append("") |
| lines.append("---") |
| lines.append("**Live transcript:**\n") |
|
|
| |
| start = max(0, len(state.transcript_en) - 10) |
| for i in range(start, len(state.transcript_en)): |
| lines.append(f"**[{i+1}]** EN: {state.transcript_en[i]}") |
| lines.append(f" YO: {state.transcript_yo[i]}\n") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def clear_stream_state(): |
| """Reset the streaming state.""" |
| return None, "Stream cleared. Click Start to begin.", StreamState() |
|
|
|
|
| |
| |
| |
|
|
| def extract_audio_from_video(video_path, output_audio_path, target_sr=16000): |
| """Extract audio track from video file as 16kHz mono WAV using ffmpeg.""" |
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", video_path, |
| "-vn", |
| "-acodec", "pcm_s16le", |
| "-ar", str(target_sr), |
| "-ac", "1", |
| output_audio_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg audio extraction failed:\n{result.stderr}") |
| return output_audio_path |
|
|
|
|
| def get_video_duration(video_path): |
| """Get video duration in seconds using ffprobe.""" |
| cmd = [ |
| "ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", |
| video_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffprobe failed: {result.stderr}") |
| return float(result.stdout.strip()) |
|
|
|
|
| def stretch_audio_to_duration(input_audio_path, output_audio_path, target_duration_s): |
| """ |
| Stretch or compress audio to match a target duration using ffmpeg's atempo filter. |
| atempo accepts 0.5-2.0 per filter; chain multiple for larger ratios. |
| """ |
| |
| current_duration = get_video_duration(input_audio_path) |
| if current_duration <= 0: |
| raise RuntimeError("Invalid audio duration") |
|
|
| |
| ratio = current_duration / target_duration_s |
|
|
| |
| filters = [] |
| remaining = ratio |
| while remaining > 2.0: |
| filters.append("atempo=2.0") |
| remaining /= 2.0 |
| while remaining < 0.5: |
| filters.append("atempo=0.5") |
| remaining /= 0.5 |
| filters.append(f"atempo={remaining:.4f}") |
| filter_str = ",".join(filters) |
|
|
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", input_audio_path, |
| "-filter:a", filter_str, |
| output_audio_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg tempo adjustment failed:\n{result.stderr}") |
| return output_audio_path |
|
|
|
|
| def mux_video_with_new_audio(video_path, audio_path, output_video_path): |
| """Combine original video with new audio track into final MP4.""" |
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", video_path, |
| "-i", audio_path, |
| "-c:v", "copy", |
| "-c:a", "aac", |
| "-map", "0:v:0", |
| "-map", "1:a:0", |
| "-shortest", |
| output_video_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg muxing failed:\n{result.stderr}") |
| return output_video_path |
|
|
|
|
| def mux_video_extended_with_audio(video_path, audio_path, output_video_path, target_duration_s): |
| """ |
| Combine video with longer audio by extending video (freezing last frame). |
| Uses ffmpeg's tpad filter to hold the last frame until audio ends. |
| """ |
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", video_path, |
| "-i", audio_path, |
| "-filter_complex", |
| f"[0:v]tpad=stop_mode=clone:stop_duration={target_duration_s}[v]", |
| "-map", "[v]", |
| "-map", "1:a:0", |
| "-c:v", "libx264", |
| "-preset", "fast", |
| "-c:a", "aac", |
| "-t", str(target_duration_s), |
| output_video_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg video extension failed:\n{result.stderr}") |
| return output_video_path |
|
|
|
|
| def dub_video(video_path, progress=gr.Progress()): |
| """ |
| Full video dubbing pipeline: |
| 1. Extract audio from video |
| 2. Transcribe English audio |
| 3. Translate to Yoruba |
| 4. Synthesize Yoruba audio |
| 5. Stretch to match original duration |
| 6. Combine with video |
| """ |
| if video_path is None: |
| return None, "Please upload a video file." |
|
|
| total_start = time.time() |
| log_lines = [] |
|
|
| try: |
| |
| work_dir = tempfile.mkdtemp(prefix="dub_") |
| extracted_audio = os.path.join(work_dir, "original_audio.wav") |
| yoruba_audio_raw = os.path.join(work_dir, "yoruba_raw.wav") |
| yoruba_audio_aligned = os.path.join(work_dir, "yoruba_aligned.wav") |
| output_video = os.path.join(work_dir, "dubbed_output.mp4") |
|
|
| |
| progress(0.1, desc="Extracting audio from video...") |
| t0 = time.time() |
| extract_audio_from_video(video_path, extracted_audio) |
| video_duration = get_video_duration(video_path) |
| log_lines.append(f"**Video duration:** {video_duration:.1f}s") |
| log_lines.append(f"**Audio extraction:** {time.time()-t0:.2f}s") |
|
|
| |
| audio_array, sample_rate = sf.read(extracted_audio, dtype="float32") |
| if audio_array.ndim > 1: |
| audio_array = audio_array.mean(axis=1) |
|
|
| |
| progress(0.25, desc="Transcribing English speech...") |
| t0 = time.time() |
| english_text = transcribe(audio_array, sample_rate) |
| log_lines.append(f"\n**ASR** ({time.time()-t0:.2f}s)") |
| log_lines.append(f"{english_text[:300]}{'...' if len(english_text) > 300 else ''}") |
|
|
| if not english_text: |
| return None, "ASR returned empty text. The video may have no audible speech." |
|
|
| |
| progress(0.3, desc="Translating English to Yoruba...") |
| t0 = time.time() |
| sentences = split_into_sentences(english_text) |
| n_sentences = len(sentences) |
| log_lines.append(f"\n**MT** starting ({n_sentences} sentences)") |
| |
| translations = [] |
| for i, s in enumerate(sentences): |
| |
| |
| yo = translate_sentence(s, fast=True) |
| translations.append(yo) |
| |
| mt_progress = 0.3 + (0.35 * (i + 1) / n_sentences) |
| progress(mt_progress, desc=f"Translating {i+1}/{n_sentences}...") |
| |
| yoruba_text = ' '.join(translations) |
| log_lines.append(f"**MT** completed in {time.time()-t0:.2f}s") |
| log_lines.append(f"{yoruba_text[:300]}{'...' if len(yoruba_text) > 300 else ''}") |
|
|
| if not yoruba_text: |
| return None, "Translation returned empty text." |
|
|
| |
| progress(0.65, desc="Synthesizing Yoruba speech...") |
| t0 = time.time() |
| |
| |
| yoruba_sentences = re.split(r'(?<=[.!?])\s+', yoruba_text) |
| yoruba_sentences = [s.strip() for s in yoruba_sentences if s.strip()] |
| n_yo = len(yoruba_sentences) |
| |
| SENTENCES_PER_TTS_CHUNK = 2 |
| audio_segments = [] |
| output_sr = None |
| |
| for i in range(0, n_yo, SENTENCES_PER_TTS_CHUNK): |
| chunk_sents = yoruba_sentences[i:i + SENTENCES_PER_TTS_CHUNK] |
| chunk_text = ' '.join(chunk_sents) |
| if not chunk_text: |
| continue |
| |
| audio_seg, seg_sr = synthesize(chunk_text) |
| if output_sr is None: |
| output_sr = seg_sr |
| if len(audio_seg) > 0: |
| audio_segments.append(audio_seg) |
| |
| silence = np.zeros(int(0.2 * seg_sr), dtype=np.float32) |
| audio_segments.append(silence) |
| |
| |
| tts_progress = 0.65 + (0.2 * (i + SENTENCES_PER_TTS_CHUNK) / n_yo) |
| progress(min(tts_progress, 0.85), desc=f"Synthesizing audio {min(i+SENTENCES_PER_TTS_CHUNK, n_yo)}/{n_yo}...") |
| |
| if not audio_segments: |
| return None, "TTS produced no audio." |
| |
| yoruba_audio = np.concatenate(audio_segments) |
| sf.write(yoruba_audio_raw, yoruba_audio, output_sr) |
| yoruba_duration = len(yoruba_audio) / output_sr |
| log_lines.append(f"\n**TTS** ({time.time()-t0:.2f}s)") |
| log_lines.append(f"Generated {yoruba_duration:.1f}s of Yoruba audio ({n_yo} sentences)") |
|
|
| |
| |
| |
| progress(0.85, desc="Aligning audio to video...") |
| t0 = time.time() |
| MAX_STRETCH = 1.2 |
| stretch_ratio = yoruba_duration / video_duration |
| log_lines.append(f"\n**Alignment** (ratio: {stretch_ratio:.2f}x)") |
|
|
| if stretch_ratio <= MAX_STRETCH: |
| |
| log_lines.append(f"Stretching audio to fit {video_duration:.1f}s video") |
| if abs(stretch_ratio - 1.0) > 0.02: |
| stretch_audio_to_duration(yoruba_audio_raw, yoruba_audio_aligned, video_duration) |
| else: |
| import shutil |
| shutil.copy(yoruba_audio_raw, yoruba_audio_aligned) |
| final_duration = video_duration |
| extend_video = False |
| else: |
| |
| log_lines.append(f"Ratio exceeds {MAX_STRETCH}x cap - keeping natural speed") |
| log_lines.append(f"Video will be extended from {video_duration:.1f}s to {yoruba_duration:.1f}s") |
| import shutil |
| shutil.copy(yoruba_audio_raw, yoruba_audio_aligned) |
| final_duration = yoruba_duration |
| extend_video = True |
| |
| log_lines.append(f"Alignment took {time.time()-t0:.2f}s") |
|
|
| |
| progress(0.95, desc="Combining audio and video...") |
| t0 = time.time() |
| if extend_video: |
| mux_video_extended_with_audio( |
| video_path, yoruba_audio_aligned, output_video, final_duration |
| ) |
| log_lines.append(f"\n**Muxing** ({time.time()-t0:.2f}s) - video extended by freezing last frame") |
| else: |
| mux_video_with_new_audio(video_path, yoruba_audio_aligned, output_video) |
| log_lines.append(f"\n**Muxing** ({time.time()-t0:.2f}s)") |
|
|
| total = time.time() - total_start |
| log_lines.append(f"\n---\n**Total processing time:** {total:.1f}s") |
|
|
| progress(1.0, desc="Done!") |
| return output_video, "\n".join(log_lines) |
|
|
| except Exception as e: |
| logger.exception("Video dubbing failed") |
| return None, f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
| DESCRIPTION = """ |
| # Live Football Commentary \u2014 English \u2192 Yoruba |
| |
| Translate English football commentary into Yoruba speech in real-time. |
| |
| **Pipeline:** ASR (Whisper) \u2192 MT (NLLB-200) \u2192 TTS (MMS-TTS Yoruba) |
| """ |
|
|
| STREAMING_INSTRUCTIONS = """ |
| ### How to use live streaming: |
| 1. Click the **microphone** button to start recording |
| 2. Speak English commentary naturally |
| 3. Every **{chunk_dur}s**, the pipeline processes your audio and plays back Yoruba |
| 4. The transcript updates live below |
| 5. Click **Clear** to reset |
| |
| **Expected latency:** ~3\u20135 seconds behind your speech. |
| """.format(chunk_dur=CHUNK_DURATION_S) |
|
|
| EXAMPLES_TEXT = [ |
| "And it's a brilliant goal from the striker!", |
| "The referee has shown a yellow card. Corner kick for the home team.", |
| "What a save by the goalkeeper! The match is heading into injury time.", |
| "He dribbles past two defenders and shoots! The ball hits the back of the net!", |
| ] |
|
|
| with gr.Blocks( |
| title="Football Commentary EN\u2192YO", |
| theme=gr.themes.Soft(), |
| ) as demo: |
|
|
| gr.Markdown(DESCRIPTION) |
|
|
| with gr.Tabs(): |
|
|
| |
| with gr.TabItem("Live Streaming"): |
| gr.Markdown(STREAMING_INSTRUCTIONS) |
|
|
| stream_state = gr.State(StreamState()) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| stream_input = gr.Audio( |
| label="Microphone (streaming)", |
| type="numpy", |
| sources=["microphone"], |
| streaming=True, |
| ) |
| clear_btn = gr.Button("Clear & Reset", variant="secondary") |
|
|
| with gr.Column(): |
| stream_output = gr.Audio( |
| label="Yoruba Output", |
| type="numpy", |
| autoplay=True, |
| elem_id="yoruba-stream-output", |
| ) |
| stream_log = gr.Markdown( |
| label="Live Transcript", |
| value="Waiting for audio input..." |
| ) |
|
|
| |
| |
| |
| gr.HTML(""" |
| <script> |
| (function() { |
| function attachAutoplayHook() { |
| const container = document.getElementById('yoruba-stream-output'); |
| if (!container) { setTimeout(attachAutoplayHook, 500); return; } |
| const audio = container.querySelector('audio'); |
| if (!audio) { setTimeout(attachAutoplayHook, 500); return; } |
| |
| // Force play whenever the src attribute changes |
| const observer = new MutationObserver(() => { |
| audio.muted = false; |
| audio.play().catch(e => console.log('Autoplay blocked:', e)); |
| }); |
| observer.observe(audio, { attributes: true, attributeFilter: ['src'] }); |
| |
| // Also play on loadeddata event |
| audio.addEventListener('loadeddata', () => { |
| audio.play().catch(e => console.log('Play failed:', e)); |
| }); |
| |
| console.log('Yoruba autoplay hook attached'); |
| } |
| if (document.readyState === 'loading') { |
| document.addEventListener('DOMContentLoaded', attachAutoplayHook); |
| } else { |
| attachAutoplayHook(); |
| } |
| })(); |
| </script> |
| """) |
|
|
| stream_input.stream( |
| fn=streaming_process, |
| inputs=[stream_input, stream_state], |
| outputs=[stream_output, stream_log, stream_state], |
| time_limit=600, |
| stream_every=1.0, |
| ) |
|
|
| clear_btn.click( |
| fn=clear_stream_state, |
| outputs=[stream_output, stream_log, stream_state], |
| ) |
|
|
| |
| with gr.TabItem("Upload / Record (Batch)"): |
| gr.Markdown("Upload or record English commentary. Full pipeline processes after recording.") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| audio_input = gr.Audio( |
| label="English Commentary Audio", |
| type="numpy", |
| sources=["upload", "microphone"], |
| ) |
| audio_submit = gr.Button("Translate to Yoruba", variant="primary", size="lg") |
|
|
| with gr.Column(): |
| audio_output = gr.Audio(label="Yoruba Commentary Audio", type="numpy") |
| audio_log = gr.Markdown(label="Pipeline Log") |
|
|
| audio_submit.click( |
| fn=process_audio_upload, |
| inputs=[audio_input], |
| outputs=[audio_output, audio_log], |
| ) |
|
|
| |
| with gr.TabItem("Text \u2192 Audio"): |
| gr.Markdown("Type English text to translate to Yoruba and hear the result.") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| text_input = gr.Textbox( |
| label="English Text", |
| placeholder="Type English football commentary here...", |
| lines=4, |
| ) |
| text_submit = gr.Button("Translate to Yoruba", variant="primary", size="lg") |
| gr.Examples( |
| examples=[[e] for e in EXAMPLES_TEXT], |
| inputs=[text_input], |
| label="Example Commentary", |
| ) |
|
|
| with gr.Column(): |
| text_audio_output = gr.Audio(label="Yoruba Audio", type="numpy") |
| text_log = gr.Markdown(label="Pipeline Log") |
|
|
| text_submit.click( |
| fn=process_text_input, |
| inputs=[text_input], |
| outputs=[text_audio_output, text_log], |
| ) |
|
|
| |
| with gr.TabItem("Video Dubbing"): |
| gr.Markdown(""" |
| ### Video Dubbing (English \u2192 Yoruba) |
| |
| Upload a video with English commentary and get back the same video with Yoruba dubbed audio. |
| |
| **How it works:** |
| 1. Audio is extracted from your video |
| 2. Transcribed to English text (Whisper) |
| 3. Translated to Yoruba (NLLB-200 with beam search) |
| 4. Synthesized into Yoruba speech (MMS-TTS) |
| 5. Time-aligned to match the original video duration |
| 6. Combined with the original video (visuals preserved) |
| |
| **Note:** Processing takes approximately 30\u201360% of the video duration on GPU. A 5-minute video takes about 2\u20133 minutes to process. Lip sync is not preserved \u2014 this is standard AI dubbing. |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| video_input = gr.Video( |
| label="Upload English Commentary Video", |
| sources=["upload"], |
| ) |
| video_submit = gr.Button( |
| "Dub to Yoruba", |
| variant="primary", |
| size="lg" |
| ) |
|
|
| with gr.Column(): |
| video_output = gr.Video( |
| label="Yoruba Dubbed Video (Download from player)", |
| ) |
| video_log = gr.Markdown( |
| label="Processing Log", |
| value="Upload a video and click 'Dub to Yoruba' to start." |
| ) |
|
|
| video_submit.click( |
| fn=dub_video, |
| inputs=[video_input], |
| outputs=[video_output, video_log], |
| ) |
|
|
| gr.Markdown(""" |
| --- |
| **Models:** |
| [ASR: PlotweaverAI/whisper-small-de-en](https://huggingface.co/PlotweaverAI/whisper-small-de-en) | |
| [MT: PlotweaverAI/nllb-200-distilled-600M-african-6lang](https://huggingface.co/PlotweaverAI/nllb-200-distilled-600M-african-6lang) | |
| [TTS: PlotweaverAI/yoruba-mms-tts-new](https://huggingface.co/PlotweaverAI/yoruba-mms-tts-new) |
| """) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|