Spaces:
Running on Zero
Running on Zero
| """ | |
| pipeline.py β Core pipeline: CLI entrypoint + importable run_pipeline() for Gradio. | |
| Usage: | |
| python pipeline.py --input data/test_video_3.mp4 --target-lang Spanish | |
| """ | |
| import argparse | |
| import os | |
| import io | |
| import logging | |
| import os | |
| import shutil | |
| import sys | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Generator | |
| from steps.s1_extract_audio import extract_audio, extract_audio_hq | |
| from steps.s2_transcribe import transcribe, POLLEN_TRANSCRIBE_MODEL | |
| from steps.s3_translate import translate | |
| from steps.s4_tts import synthesise_segments | |
| from steps.s5_sync import sync_and_stitch | |
| from steps.s6_captions import generate_captions | |
| from steps.s6_merge import merge_audio_video | |
| def _log_step_done(label: str, start: float): | |
| """Print duration + separator line for a completed step.""" | |
| elapsed = time.time() - start | |
| if elapsed >= 60: | |
| mins, secs = divmod(elapsed, 60) | |
| print(f"[{label}] Duration: {int(mins)}m {int(secs)}s") | |
| else: | |
| print(f"[{label}] Duration: {int(elapsed)}s") | |
| print("=" * 40) | |
| LANGUAGE_CODES = { | |
| "Arabic": "ar", | |
| "Chinese": "zh", | |
| "Danish": "da", | |
| "Dutch": "nl", | |
| "English": "en", | |
| "Finnish": "fi", | |
| "French": "fr", | |
| "German": "de", | |
| "Greek": "el", | |
| "Hebrew": "he", | |
| "Hindi": "hi", | |
| "Italian": "it", | |
| "Japanese": "ja", | |
| "Korean": "ko", | |
| "Malay": "ms", | |
| "Norwegian": "no", | |
| "Polish": "pl", | |
| "Portuguese": "pt", | |
| "Russian": "ru", | |
| "Spanish": "es", | |
| "Swahili": "sw", | |
| "Swedish": "sv", | |
| "Turkish": "tr", | |
| "Urdu": "hi", | |
| } | |
| def run_pipeline( | |
| video_path: str, | |
| target_language: str = "Spanish", | |
| source_language: str = "auto", | |
| output_path: str | None = None, | |
| voice_mode: str = "chatterbox", | |
| preview_event: threading.Event | None = None, | |
| job_state: dict | None = None, | |
| captions: bool = True, | |
| preserve_music: bool = False, | |
| data_dir: str | None = None, | |
| video_link: str | None = None, | |
| ) -> Generator[str | dict, None, str]: | |
| """ | |
| Run the full translation pipeline, yielding progress messages. | |
| Args: | |
| video_path: Path to the input video file. | |
| target_language: Target language name (e.g. "Spanish"). | |
| source_language: ISO-639-1 code of the source language, or "auto" for | |
| Whisper to auto-detect (default "auto"). Forcing a wrong code makes | |
| Whisper silently translate-and-transcribe instead of transcribing. | |
| output_path: Where to save the output video. Auto-generated if None. | |
| voice_mode: TTS engine to use ("chatterbox" or "omnivoice"). | |
| In Space deployments, this must match TTS_ENGINE env var. | |
| preview_event: Deprecated - kept for compatibility, but unused in single-engine mode. | |
| job_state: Shared dict with the server. | |
| Yields: | |
| str: Progress messages for each step. | |
| dict: Special sentinel when previews are ready. | |
| Returns: | |
| str: Path to the translated output video. | |
| """ | |
| # Single-engine mode: voice_mode must match TTS_ENGINE if set | |
| space_engine = os.getenv("TTS_ENGINE") | |
| if space_engine and voice_mode != space_engine: | |
| yield f"β οΈ Warning: voice_mode='{voice_mode}' but Space TTS_ENGINE='{space_engine}'. Using {space_engine}.\n" | |
| voice_mode = space_engine | |
| # Fixed step count (no more preview_both mode) | |
| total_steps = 6 + (1 if preserve_music else 0) | |
| # Prepare output path | |
| if output_path is None: | |
| if data_dir: | |
| output_path = str(Path(data_dir) / "output.mp4") | |
| else: | |
| stem = Path(video_path).stem | |
| output_path = f"output_{stem}_{target_language.lower()}.mp4" | |
| # Clean tmp dir | |
| shutil.rmtree("tmp", ignore_errors=True) | |
| os.makedirs("tmp/audio/source", exist_ok=True) | |
| # Set up logging to tmp/logs.txt (clean logs only, no torch/chatterbox noise) | |
| log_path = "tmp/logs.txt" | |
| _log_file = open(log_path, "w", encoding="utf-8") | |
| _orig_stdout = sys.stdout | |
| _orig_stderr = sys.stderr | |
| # Patterns to filter out of log file (still shown in terminal) | |
| _NOISE = ( | |
| "Sampling:", "sampling", "UserWarning", "FutureWarning", "DeprecationWarning", | |
| "torch.backends", "torch.functional", "torch.fft", "torchaudio/compliance", | |
| "sdp_kernel", "LoRACompatible", "pkg_resources", "Fetching", | |
| "output_attentions", "TRANSFORMERS_VERBOSITY", | |
| "istft", "stft", "resize_", "inverse_transform", | |
| "PerthNet", "loaded Perth", "diffusers/models", | |
| "chatterbox/models/s3gen", "alignment_stream_analyzer", | |
| "WARNING:chatterbox", | |
| ) | |
| class _Tee(io.TextIOBase): | |
| """Write to both the original stream and the log file (filtered).""" | |
| def __init__(self, original, filter_noise=False): | |
| self._original = original | |
| self._filter = filter_noise | |
| def write(self, s): | |
| self._original.write(s) | |
| if self._filter and any(p in s for p in _NOISE): | |
| return len(s) | |
| if not _log_file.closed: | |
| _log_file.write(s) | |
| _log_file.flush() | |
| return len(s) | |
| def flush(self): | |
| self._original.flush() | |
| if not _log_file.closed: | |
| _log_file.flush() | |
| sys.stdout = _Tee(_orig_stdout, filter_noise=True) | |
| sys.stderr = _Tee(_orig_stderr, filter_noise=True) | |
| try: | |
| yield f"π¬ Starting pipeline: {video_path} β {target_language}\n" | |
| # Step 1: Extract audio | |
| yield f"π Step 1/{total_steps}: Extracting audio...\n" | |
| _t0 = time.time() | |
| audio_path = extract_audio(video_path, "tmp/audio/source/extracted_audio.wav") | |
| yield f" β Audio extracted: {audio_path}\n" | |
| # Step 1b: Source separation (conditional) | |
| vocals_path = audio_path # default: use full mix | |
| music_path = None | |
| if preserve_music: | |
| from steps.s1b_separate import separate_audio | |
| audio_hq = extract_audio_hq(video_path, "tmp/audio/source/extracted_audio_hq.wav") | |
| _log_step_done("s1", _t0) | |
| yield f"π΅ Step 2/{total_steps}: Separating vocals from background music...\n" | |
| _t0 = time.time() | |
| vocals_path, music_path = separate_audio(audio_hq, "tmp/audio/source") | |
| yield f" β Vocals and accompaniment separated\n" | |
| _log_step_done("s1b", _t0) | |
| else: | |
| _log_step_done("s1", _t0) | |
| # Step offset: steps after separation shift by 1 when preserve_music is on | |
| step_offset = 1 if preserve_music else 0 | |
| # Step 2: Transcribe | |
| yield f"π Step {2 + step_offset}/{total_steps}: Transcribing (Pollinations Whisper / mlx-whisper)...\n" | |
| _t0 = time.time() | |
| segments = transcribe(vocals_path, language=source_language) | |
| yield f" β {len(segments)} segments transcribed\n" | |
| for seg in segments: | |
| yield f" [{seg['start']:.1f}sβ{seg['end']:.1f}s] {seg['text']}\n" | |
| # Dump transcription to tmp for inspection | |
| import json as _json | |
| from urllib.parse import urlparse, urlunparse | |
| with open("tmp/transcription.json", "w", encoding="utf-8") as _tf: | |
| out_data = { | |
| "model_provider": "pollinations", | |
| "model_name": POLLEN_TRANSCRIBE_MODEL, | |
| "source_language": source_language, | |
| "audio_path": vocals_path, | |
| "segment_count": len(segments), | |
| "total_duration": round(segments[-1]["end"], 2) if segments else 0, | |
| "segments": [ | |
| { | |
| "index": i, | |
| "start": seg["start"], | |
| "end": seg["end"], | |
| "duration": round(seg["end"] - seg["start"], 2), | |
| "text": seg["text"], | |
| **({"words": seg["words"]} if "words" in seg else {}), | |
| } | |
| for i, seg in enumerate(segments) | |
| ], | |
| } | |
| if video_link: | |
| parsed = urlparse(video_link) | |
| clean_link = urlunparse(parsed._replace(query="", fragment="")) | |
| out_data = {"video_link": clean_link, **out_data} | |
| _json.dump(out_data, _tf, indent=2, ensure_ascii=False) | |
| _log_step_done("s2", _t0) | |
| # Step 3: Translate | |
| yield f"π Step {3 + step_offset}/{total_steps}: Translating to {target_language}...\n" | |
| _t0 = time.time() | |
| segments = translate(segments, target_language) | |
| yield f" β Translation complete\n" | |
| for seg in segments: | |
| yield f" β {seg['translated_text']}\n" | |
| target_lang_code = LANGUAGE_CODES.get(target_language, "es") | |
| _log_step_done("s3", _t0) | |
| # ββ Step 4: TTS Synthesis βββββββββββββββββββββββββββββββ | |
| model_name = voice_mode # Uses TTS_ENGINE env var in Space deployments | |
| yield f"π£οΈ Step {4 + step_offset}/{total_steps}: Synthesising speech ({model_name})...\n" | |
| _t0 = time.time() | |
| tts_gen = synthesise_segments( | |
| segments, vocals_path, | |
| language_id=target_lang_code, | |
| output_dir="tmp/audio/tts", | |
| model_name=model_name, | |
| ) | |
| for msg in tts_gen: | |
| if isinstance(msg, dict) and "__TTS_RESULT__" in msg: | |
| segments = msg["__TTS_RESULT__"] | |
| else: | |
| yield msg | |
| yield f" β {len(segments)} segments synthesised\n" | |
| _log_step_done("s4_tts", _t0) | |
| # Step 5: Sync | |
| yield f"β±οΈ Step {5 + step_offset}/{total_steps}: Syncing audio to original timestamps...\n" | |
| _t0 = time.time() | |
| final_audio = sync_and_stitch(segments, "tmp/audio/final_audio.wav", "tmp/audio/tts_synced") | |
| yield f" β Audio synced: {final_audio}\n" | |
| _log_step_done("s5", _t0) | |
| # Captions + Merge | |
| captions_path = None | |
| _t0 = time.time() | |
| if captions: | |
| captions_path = generate_captions(segments, "tmp/captions.ass", target_language=target_language) | |
| yield f" β Captions generated: {captions_path}\n" | |
| # Step 6: Merge | |
| music_label = " + music" if music_path else "" | |
| yield f"ποΈ Step {6 + step_offset}/{total_steps}: Merging translated audio{' + captions' if captions_path else ''}{music_label} into video...\n" | |
| result = merge_audio_video(video_path, final_audio, output_path, captions_path=captions_path, music_path=music_path) | |
| _log_step_done("s6", _t0) | |
| yield f"\nβ Done! Output saved to: {result}\n" | |
| finally: | |
| sys.stdout = _orig_stdout | |
| sys.stderr = _orig_stderr | |
| if not _log_file.closed: | |
| _log_file.close() | |
| if data_dir: | |
| def _safe_copy(src, dst_name): | |
| if os.path.exists(src): | |
| shutil.copy2(src, os.path.join(data_dir, dst_name)) | |
| _safe_copy(log_path, "logs.txt") | |
| _safe_copy("tmp/transcription.json", "transcription.json") | |
| _safe_copy("tmp/llm_calls.json", "llm_calls.json") | |
| _safe_copy("tmp/audio/tts/tts_manifest.json", "tts_manifest.json") | |
| _safe_copy("tmp/audio/tts/segment_comparison.json", "segment_comparison.json") | |
| print(f"[pipeline] Logs saved β {log_path}") | |
| return result | |
| def _collect_output(gen: Generator) -> tuple[list[str], str]: | |
| """Collect all yields and the return value from the generator.""" | |
| messages = [] | |
| output_path = None | |
| try: | |
| while True: | |
| msg = next(gen) | |
| if isinstance(msg, dict): | |
| # Ignore preview sentinels in CLI mode (deprecated preview_both flow) | |
| continue | |
| messages.append(msg) | |
| print(msg, end="", flush=True) | |
| except StopIteration as e: | |
| output_path = e.value | |
| return messages, output_path | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Video Translation Pipeline") | |
| parser.add_argument("--input", required=True, help="Input video path") | |
| parser.add_argument( | |
| "--target-lang", | |
| default="Spanish", | |
| choices=list(LANGUAGE_CODES.keys()), | |
| help="Target language (default: Spanish)", | |
| ) | |
| parser.add_argument( | |
| "--source-lang", | |
| default="auto", | |
| help="Source language ISO-639-1 code or 'auto' to let Whisper detect (default: auto)", | |
| ) | |
| parser.add_argument("--output", default=None, help="Output video path") | |
| parser.add_argument( | |
| "--voice-mode", | |
| default="chatterbox", | |
| choices=["chatterbox", "omnivoice", "qwen3"], | |
| help="TTS engine to use (default: chatterbox). Must match TTS_ENGINE env var in Space deployments.", | |
| ) | |
| parser.add_argument( | |
| "--preserve-music", | |
| action="store_true", | |
| help="Separate and preserve background music using Demucs", | |
| ) | |
| args = parser.parse_args() | |
| gen = run_pipeline( | |
| video_path=args.input, | |
| target_language=args.target_lang, | |
| source_language=args.source_lang, | |
| output_path=args.output, | |
| voice_mode=args.voice_mode, | |
| preserve_music=args.preserve_music, | |
| ) | |
| _, output = _collect_output(gen) | |
| print(f"\nFinal output: {output}") | |
| if __name__ == "__main__": | |
| main() | |