|
|
| """
|
| Universal Media Transcriber
|
| Supports: YouTube, YouTube Music, Spotify, Direct Audio/Video URLs
|
| Blazing fast: uses native captions when available, falls back to faster-whisper
|
| """
|
|
|
| import os
|
| import sys
|
| import re
|
| import json
|
| import time
|
| import shutil
|
| import hashlib
|
| import argparse
|
| import tempfile
|
| import subprocess
|
| from pathlib import Path
|
| from datetime import timedelta
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| from urllib.parse import urlparse, parse_qs
|
|
|
|
|
| script_dir = str(Path(__file__).parent.absolute())
|
| if script_dir not in os.environ["PATH"]:
|
| os.environ["PATH"] = script_dir + os.pathsep + os.environ["PATH"]
|
|
|
|
|
|
|
|
|
|
|
| REQUIRED = {
|
| "yt_dlp": "yt-dlp",
|
| "youtube_transcript_api": "youtube-transcript-api",
|
| "faster_whisper": "faster-whisper",
|
| "rich": "rich",
|
| "spotdl": "spotdl",
|
| "requests": "requests",
|
| }
|
|
|
| def ensure_deps():
|
| missing = []
|
| for module, pkg in REQUIRED.items():
|
| try:
|
| __import__(module)
|
| except ImportError:
|
| missing.append(pkg)
|
| if missing:
|
| print(f"[setup] Installing: {', '.join(missing)} ...")
|
| subprocess.check_call(
|
| [sys.executable, "-m", "pip", "install", "--quiet", "--break-system-packages"] + missing
|
| )
|
| print("[setup] Done. Reloading...\n")
|
|
|
| ensure_deps()
|
|
|
|
|
|
|
|
|
|
|
| import yt_dlp
|
| import requests
|
| from rich.console import Console
|
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
|
| from rich.panel import Panel
|
| from rich.table import Table
|
| from rich import print as rprint
|
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
|
|
|
| console = Console()
|
|
|
|
|
|
|
|
|
|
|
| WHISPER_MODEL = "base"
|
| WHISPER_DEVICE = "auto"
|
| WHISPER_THREADS = os.cpu_count()
|
| AUDIO_FORMAT = "mp3"
|
| MAX_WORKERS = 4
|
| CACHE_DIR = Path.home() / ".transcriber_cache"
|
| CACHE_DIR.mkdir(exist_ok=True)
|
|
|
|
|
| LANG_PREF = ["en", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "en-IE", "en-NZ", "en-PH", "en-ZA", "en-orig", "a.en"]
|
|
|
|
|
|
|
|
|
|
|
| def detect_source(url: str) -> str:
|
| """Returns: youtube | youtube_music | spotify | audio | unknown"""
|
| parsed = urlparse(url)
|
| host = parsed.netloc.lower().replace("www.", "")
|
|
|
| if host in ("youtube.com", "youtu.be", "m.youtube.com"):
|
| return "youtube"
|
| if host in ("music.youtube.com",):
|
| return "youtube_music"
|
| if host in ("open.spotify.com", "spotify.com"):
|
| return "spotify"
|
| if any(url.lower().endswith(ext) for ext in [
|
| ".mp3", ".mp4", ".wav", ".ogg", ".flac", ".m4a", ".webm",
|
| ".aac", ".opus", ".mkv", ".avi", ".mov"
|
| ]):
|
| return "audio"
|
|
|
| try:
|
| r = requests.head(url, timeout=5, allow_redirects=True)
|
| ct = r.headers.get("content-type", "")
|
| if "audio" in ct or "video" in ct:
|
| return "audio"
|
| except Exception:
|
| pass
|
| return "unknown"
|
|
|
|
|
| def extract_youtube_id(url: str) -> str | None:
|
| """Extract video ID from any YouTube URL format."""
|
| patterns = [
|
| r"(?:v=|youtu\.be/|embed/|shorts/)([A-Za-z0-9_-]{11})",
|
| ]
|
| for p in patterns:
|
| m = re.search(p, url)
|
| if m:
|
| return m.group(1)
|
| return None
|
|
|
|
|
| def extract_spotify_type(url: str) -> tuple[str, str]:
|
| """Returns (type, id) e.g. ('track', 'abc123')"""
|
| m = re.search(r"spotify\.com/(track|album|playlist|episode|show)/([A-Za-z0-9]+)", url)
|
| if m:
|
| return m.group(1), m.group(2)
|
| return "unknown", ""
|
|
|
|
|
|
|
|
|
|
|
| def cache_key(url: str) -> str:
|
| return hashlib.md5(url.encode()).hexdigest()
|
|
|
| def cache_get(url: str) -> str | None:
|
| path = CACHE_DIR / f"{cache_key(url)}.txt"
|
| if path.exists():
|
| return path.read_text(encoding="utf-8")
|
| return None
|
|
|
| def cache_set(url: str, text: str):
|
| path = CACHE_DIR / f"{cache_key(url)}.txt"
|
| path.write_text(text, encoding="utf-8")
|
|
|
|
|
|
|
|
|
|
|
| _whisper_model = None
|
|
|
| def get_whisper():
|
| global _whisper_model
|
| if _whisper_model is None:
|
| from faster_whisper import WhisperModel
|
| device = WHISPER_DEVICE
|
| if device == "auto":
|
| try:
|
| import torch
|
| device = "cuda" if torch.cuda.is_available() else "cpu"
|
| except ImportError:
|
| device = "cpu"
|
| console.log(f"[cyan]Loading Whisper [{WHISPER_MODEL}] on {device}...[/cyan]")
|
| compute = "float16" if device == "cuda" else "int8"
|
| _whisper_model = WhisperModel(WHISPER_MODEL, device=device, compute_type=compute,
|
| num_workers=WHISPER_THREADS, cpu_threads=WHISPER_THREADS)
|
| return _whisper_model
|
|
|
|
|
| def transcribe_audio_file(audio_path: str, lang: str = None) -> str:
|
| """Transcribe a local audio file with faster-whisper. Returns full transcript text."""
|
| model = get_whisper()
|
| opts = dict(beam_size=5, word_timestamps=False, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500))
|
| if lang:
|
| opts["language"] = lang
|
| segments, info = model.transcribe(audio_path, **opts)
|
| lines = []
|
| for seg in segments:
|
| ts = str(timedelta(seconds=int(seg.start))).zfill(8)
|
| lines.append(f"[{ts}] {seg.text.strip()}")
|
| return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
| def fetch_youtube_captions(video_id: str) -> str | None:
|
| """Try to get native captions (instant, no download)."""
|
| try:
|
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
|
|
|
| transcript = None
|
| for lang in LANG_PREF:
|
| try:
|
| transcript = transcript_list.find_transcript([lang])
|
| break
|
| except Exception:
|
| pass
|
| if transcript is None:
|
|
|
| transcript = next(iter(transcript_list))
|
| entries = transcript.fetch()
|
| lines = []
|
| for e in entries:
|
| ts = str(timedelta(seconds=int(e["start"]))).zfill(8)
|
| lines.append(f"[{ts}] {e['text'].strip()}")
|
| return "\n".join(lines)
|
| except (TranscriptsDisabled, NoTranscriptFound):
|
| return None
|
| except Exception as exc:
|
| console.log(f"[yellow]Caption fetch warning: {exc}[/yellow]")
|
| return None
|
|
|
|
|
| def download_audio_yt(url: str, out_dir: str) -> str:
|
| """Download audio from YouTube/YouTube Music using yt-dlp. Returns file path."""
|
| ydl_opts = {
|
| "format": "bestaudio/best",
|
| "outtmpl": os.path.join(out_dir, "%(id)s.%(ext)s"),
|
| "postprocessors": [{
|
| "key": "FFmpegExtractAudio",
|
| "preferredcodec": AUDIO_FORMAT,
|
| "preferredquality": "128",
|
| }],
|
| "quiet": True,
|
| "no_warnings": True,
|
| "concurrent_fragment_downloads": 8,
|
| }
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(url, download=True)
|
| video_id = info.get("id", "audio")
|
| return os.path.join(out_dir, f"{video_id}.{AUDIO_FORMAT}")
|
|
|
|
|
| def get_video_metadata(url: str) -> dict:
|
| """Get title, uploader, duration without downloading."""
|
| ydl_opts = {"quiet": True, "no_warnings": True, "skip_download": True}
|
| try:
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(url, download=False)
|
| return {
|
| "title": info.get("title", "Unknown"),
|
| "uploader": info.get("uploader", "Unknown"),
|
| "duration": info.get("duration", 0),
|
| "description": info.get("description", ""),
|
| "upload_date": info.get("upload_date", ""),
|
| }
|
| except Exception:
|
| return {"title": "Unknown", "uploader": "Unknown", "duration": 0}
|
|
|
|
|
| def transcribe_youtube(url: str, force_whisper: bool = False) -> dict:
|
| """Full pipeline for YouTube / YouTube Music."""
|
| video_id = extract_youtube_id(url) or "unknown"
|
| meta = get_video_metadata(url)
|
|
|
| transcript_text = None
|
| method = "unknown"
|
|
|
| if not force_whisper:
|
| console.log(f"[cyan]Trying native captions for[/cyan] [bold]{meta['title']}[/bold]")
|
| transcript_text = fetch_youtube_captions(video_id)
|
| if transcript_text:
|
| method = "native_captions"
|
| console.log("[green]β Got captions instantly (no download needed)[/green]")
|
|
|
| if transcript_text is None:
|
| console.log("[yellow]No captions β downloading audio for Whisper...[/yellow]")
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| audio_path = download_audio_yt(url, tmpdir)
|
| console.log(f"[cyan]Transcribing with Whisper [{WHISPER_MODEL}]...[/cyan]")
|
| transcript_text = transcribe_audio_file(audio_path)
|
| method = f"whisper_{WHISPER_MODEL}"
|
|
|
| return {
|
| "url": url,
|
| "source": "youtube",
|
| "method": method,
|
| "meta": meta,
|
| "transcript": transcript_text,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def transcribe_spotify(url: str) -> dict:
|
| """Download Spotify track/episode then transcribe."""
|
| sp_type, sp_id = extract_spotify_type(url)
|
|
|
|
|
| if sp_type == "episode":
|
| console.log("[cyan]Spotify episode β trying yt-dlp...[/cyan]")
|
| try:
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| audio_path = download_audio_yt(url, tmpdir)
|
| meta = get_video_metadata(url)
|
| transcript_text = transcribe_audio_file(audio_path)
|
| return {
|
| "url": url,
|
| "source": "spotify_episode",
|
| "method": f"whisper_{WHISPER_MODEL}",
|
| "meta": meta,
|
| "transcript": transcript_text,
|
| }
|
| except Exception as e:
|
| console.log(f"[yellow]yt-dlp failed for Spotify episode: {e}[/yellow]")
|
|
|
|
|
| console.log("[cyan]Spotify music β downloading via spotdl...[/cyan]")
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| result = subprocess.run(
|
| [sys.executable, "-m", "spotdl", url, "--output", tmpdir,
|
| "--format", "mp3", "--bitrate", "128k", "--print-errors"],
|
| capture_output=True, text=True
|
| )
|
|
|
| audio_files = list(Path(tmpdir).glob("*.mp3")) + list(Path(tmpdir).glob("*.m4a"))
|
| if not audio_files:
|
| raise RuntimeError(f"spotdl produced no files.\n{result.stderr}")
|
|
|
| transcripts = []
|
| for af in sorted(audio_files):
|
| console.log(f"[cyan]Transcribing:[/cyan] {af.name}")
|
| t = transcribe_audio_file(str(af))
|
| transcripts.append(f"=== {af.stem} ===\n{t}")
|
|
|
| return {
|
| "url": url,
|
| "source": f"spotify_{sp_type}",
|
| "method": f"spotdl+whisper_{WHISPER_MODEL}",
|
| "meta": {"title": f"Spotify {sp_type.title()}", "uploader": "Spotify"},
|
| "transcript": "\n\n".join(transcripts),
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def transcribe_direct_audio(url: str) -> dict:
|
| """Download a direct audio/video file and transcribe."""
|
| console.log(f"[cyan]Downloading direct audio:[/cyan] {url}")
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| ydl_opts = {
|
| "outtmpl": os.path.join(tmpdir, "audio.%(ext)s"),
|
| "quiet": True,
|
| "no_warnings": True,
|
| "concurrent_fragment_downloads": 8,
|
| }
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(url, download=True)
|
| title = info.get("title", Path(url).stem) if info else Path(url).stem
|
|
|
| audio_files = list(Path(tmpdir).iterdir())
|
| if not audio_files:
|
| raise RuntimeError("No file downloaded")
|
| audio_path = str(audio_files[0])
|
| console.log(f"[cyan]Transcribing:[/cyan] {Path(audio_path).name}")
|
| transcript_text = transcribe_audio_file(audio_path)
|
|
|
| return {
|
| "url": url,
|
| "source": "audio",
|
| "method": f"whisper_{WHISPER_MODEL}",
|
| "meta": {"title": title, "uploader": "Direct"},
|
| "transcript": transcript_text,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def expand_playlist(url: str) -> list[str]:
|
| """Return list of individual video URLs from a playlist/album/channel."""
|
| ydl_opts = {
|
| "quiet": True,
|
| "no_warnings": True,
|
| "extract_flat": True,
|
| "skip_download": True,
|
| }
|
| try:
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(url, download=False)
|
| if "entries" in info:
|
| urls = []
|
| for e in info["entries"]:
|
| if e and e.get("url"):
|
| urls.append(e["url"])
|
| elif e and e.get("id"):
|
| urls.append(f"https://www.youtube.com/watch?v={e['id']}")
|
| return urls
|
| except Exception as exc:
|
| console.log(f"[yellow]Playlist expansion warning: {exc}[/yellow]")
|
| return [url]
|
|
|
|
|
|
|
|
|
|
|
| def transcribe_url(url: str, force_whisper: bool = False, use_cache: bool = True) -> dict:
|
| """Route URL to the correct transcription pipeline."""
|
| url = url.strip()
|
|
|
| if use_cache:
|
| cached = cache_get(url)
|
| if cached:
|
| console.log(f"[green]β Cache hit:[/green] {url[:60]}")
|
| return {"url": url, "source": "cache", "method": "cache",
|
| "meta": {"title": "Cached"}, "transcript": cached}
|
|
|
| source = detect_source(url)
|
| console.log(f"[bold blue]Source detected:[/bold blue] {source} β {url[:70]}")
|
|
|
| if source in ("youtube", "youtube_music"):
|
| result = transcribe_youtube(url, force_whisper=force_whisper)
|
| elif source == "spotify":
|
| result = transcribe_spotify(url)
|
| elif source == "audio":
|
| result = transcribe_direct_audio(url)
|
| else:
|
|
|
| console.log("[yellow]Unknown source β trying yt-dlp generic handler...[/yellow]")
|
| result = transcribe_direct_audio(url)
|
|
|
| if use_cache:
|
| cache_set(url, result["transcript"])
|
|
|
| return result
|
|
|
|
|
|
|
|
|
|
|
| def format_transcript(result: dict, include_header: bool = True) -> str:
|
| meta = result.get("meta", {})
|
| title = meta.get("title", "Unknown")
|
| uploader = meta.get("uploader", "Unknown")
|
| duration = meta.get("duration", 0)
|
| dur_str = str(timedelta(seconds=int(duration))) if duration else "N/A"
|
| method = result.get("method", "unknown")
|
| url = result.get("url", "")
|
|
|
| header = ""
|
| if include_header:
|
| header = (
|
| f"{'='*70}\n"
|
| f"TITLE : {title}\n"
|
| f"UPLOADER : {uploader}\n"
|
| f"DURATION : {dur_str}\n"
|
| f"SOURCE : {result.get('source','')}\n"
|
| f"METHOD : {method}\n"
|
| f"URL : {url}\n"
|
| f"{'='*70}\n\n"
|
| )
|
|
|
| return header + result["transcript"] + "\n"
|
|
|
|
|
| def safe_filename(title: str) -> str:
|
| title = re.sub(r'[<>:"/\\|?*]', "_", title)
|
| title = title.strip(". ")[:80]
|
| return title or "transcript"
|
|
|
|
|
|
|
|
|
|
|
| def process_batch(urls: list[str], output_dir: Path, force_whisper: bool,
|
| use_cache: bool, merge: bool, workers: int):
|
| output_dir.mkdir(parents=True, exist_ok=True)
|
| results = []
|
| errors = []
|
|
|
| console.rule("[bold green]Universal Media Transcriber[/bold green]")
|
| console.print(f"[dim]URLs: {len(urls)} | Workers: {workers} | Model: {WHISPER_MODEL}[/dim]\n")
|
|
|
| def job(url):
|
| t0 = time.time()
|
| try:
|
| r = transcribe_url(url, force_whisper=force_whisper, use_cache=use_cache)
|
| r["elapsed"] = round(time.time() - t0, 1)
|
| return r
|
| except Exception as exc:
|
| return {"url": url, "error": str(exc), "elapsed": round(time.time() - t0, 1)}
|
|
|
| with Progress(
|
| SpinnerColumn(),
|
| TextColumn("[progress.description]{task.description}"),
|
| BarColumn(),
|
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
| TimeElapsedColumn(),
|
| console=console,
|
| ) as progress:
|
| task = progress.add_task("Transcribing...", total=len(urls))
|
| with ThreadPoolExecutor(max_workers=workers) as pool:
|
| futures = {pool.submit(job, u): u for u in urls}
|
| for fut in as_completed(futures):
|
| result = fut.result()
|
| if "error" in result:
|
| errors.append(result)
|
| console.log(f"[red]β Error:[/red] {result['url'][:60]} β {result['error']}")
|
| else:
|
| results.append(result)
|
| console.log(f"[green]β[/green] {result['meta'].get('title','?')[:50]} [{result['elapsed']}s]")
|
| progress.advance(task)
|
|
|
|
|
| if merge and results:
|
| merged_path = output_dir / "merged_transcript.txt"
|
| with open(merged_path, "w", encoding="utf-8") as f:
|
| for r in results:
|
| f.write(format_transcript(r))
|
| f.write("\n" + "β" * 70 + "\n\n")
|
| console.print(f"\n[bold green]β Merged transcript:[/bold green] {merged_path}")
|
| else:
|
| for r in results:
|
| title = r["meta"].get("title", "transcript")
|
| fname = safe_filename(title) + ".txt"
|
| out_path = output_dir / fname
|
|
|
| if out_path.exists():
|
| stem = out_path.stem
|
| out_path = output_dir / f"{stem}_{cache_key(r['url'])[:6]}.txt"
|
| out_path.write_text(format_transcript(r), encoding="utf-8")
|
| console.print(f"[green]β Saved:[/green] {out_path}")
|
|
|
|
|
| table = Table(title="\n Summary", show_lines=True)
|
| table.add_column("Title", style="cyan", max_width=40)
|
| table.add_column("Method", style="magenta")
|
| table.add_column("Time", justify="right")
|
| table.add_column("Status", justify="center")
|
|
|
| for r in results:
|
| table.add_row(
|
| r["meta"].get("title", "?")[:38],
|
| r.get("method", "?"),
|
| f"{r['elapsed']}s",
|
| "[green]β[/green]",
|
| )
|
| for r in errors:
|
| table.add_row(r["url"][:38], "β", f"{r['elapsed']}s", "[red]β[/red]")
|
|
|
| console.print(table)
|
| console.print(f"\n[bold]Done:[/bold] {len(results)} ok, {len(errors)} failed β [dim]{output_dir}[/dim]")
|
|
|
|
|
|
|
|
|
|
|
| def main():
|
| global WHISPER_MODEL
|
| parser = argparse.ArgumentParser(
|
| description=" Universal Media Transcriber β YouTube, Spotify, Audio & more",
|
| formatter_class=argparse.RawDescriptionHelpFormatter,
|
| epilog="""
|
| Examples:
|
| python transcriber.py https://youtu.be/dQw4w9WgXcQ
|
| python transcriber.py URL1 URL2 URL3 --merge
|
| python transcriber.py --file urls.txt --output ./transcripts
|
| python transcriber.py https://open.spotify.com/track/... --whisper
|
| python transcriber.py https://youtu.be/... --model large-v3
|
| python transcriber.py --playlist https://youtube.com/playlist?list=...
|
| """
|
| )
|
| parser.add_argument("urls", nargs="*", help="One or more media URLs")
|
| parser.add_argument("--file", "-f", help="Text file with one URL per line")
|
| parser.add_argument("--output", "-o", default="./transcripts", help="Output directory (default: ./transcripts)")
|
| parser.add_argument("--merge", "-m", action="store_true", help="Merge all transcripts into one file")
|
| parser.add_argument("--whisper", "-w", action="store_true", help="Force Whisper (skip caption check)")
|
| parser.add_argument("--model", default=WHISPER_MODEL,
|
| choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"],
|
| help="Whisper model size (default: base)")
|
| parser.add_argument("--workers", type=int, default=MAX_WORKERS, help="Parallel workers (default: 4)")
|
| parser.add_argument("--no-cache", action="store_true", help="Disable transcript cache")
|
| parser.add_argument("--playlist", action="store_true", help="Treat URL as playlist β expand all videos")
|
| parser.add_argument("--clear-cache", action="store_true", help="Clear the transcript cache and exit")
|
|
|
| args = parser.parse_args()
|
|
|
| if args.clear_cache:
|
| shutil.rmtree(CACHE_DIR, ignore_errors=True)
|
| CACHE_DIR.mkdir(exist_ok=True)
|
| console.print("[green]Cache cleared.[/green]")
|
| return
|
|
|
|
|
| all_urls = list(args.urls)
|
| if args.file:
|
| path = Path(args.file)
|
| if not path.exists():
|
| console.print(f"[red]File not found: {path}[/red]")
|
| sys.exit(1)
|
| lines = path.read_text().splitlines()
|
| all_urls += [l.strip() for l in lines if l.strip() and not l.startswith("#")]
|
|
|
| if not all_urls:
|
| parser.print_help()
|
| sys.exit(0)
|
|
|
|
|
| if args.playlist or len(all_urls) == 1:
|
| expanded = []
|
| for u in all_urls:
|
| exp = expand_playlist(u)
|
| if len(exp) > 1:
|
| console.log(f"[cyan]Playlist expanded:[/cyan] {len(exp)} items")
|
| expanded.extend(exp)
|
| all_urls = expanded
|
|
|
|
|
| seen = set()
|
| deduped = []
|
| for u in all_urls:
|
| if u not in seen:
|
| seen.add(u)
|
| deduped.append(u)
|
| all_urls = deduped
|
|
|
| process_batch(
|
| urls=all_urls,
|
| output_dir=Path(args.output),
|
| force_whisper=args.whisper,
|
| use_cache=not args.no_cache,
|
| merge=args.merge,
|
| wocd rkers=args.workers,
|
| )
|
|
|
|
|
| if __name__ == "__main__":
|
| main() |