""" pipeline/transcriber.py Uploads audio to AssemblyAI and returns word-level transcript data. Uses AssemblyAI REST API directly to avoid SDK version incompatibilities around speech model config fields. """ import os import time from pathlib import Path from typing import Callable, List, Optional import httpx from utils import log def _resolve_speech_models() -> List[str]: """ Resolve AssemblyAI speech models from env. Expected env format: ASSEMBLYAI_SPEECH_MODELS="universal-3-pro,universal-2" """ allowed = {"universal-3-pro", "universal-2"} raw = os.environ.get("ASSEMBLYAI_SPEECH_MODELS", "universal-2") # Remove all whitespace characters from each model name models = [m.replace(" ", "") for m in raw.split(",") if m.replace(" ", "")] if not models: raise RuntimeError( "ASSEMBLYAI_SPEECH_MODELS is empty. " "Set it to one or more of: universal-3-pro, universal-2." ) invalid = [m for m in models if m not in allowed] if invalid: raise RuntimeError( "Invalid ASSEMBLYAI_SPEECH_MODELS value(s): " f"{', '.join(invalid)}. Allowed values: universal-3-pro, universal-2." ) return models def _iter_file_chunks(path: Path, chunk_size: int = 5 * 1024 * 1024): with path.open("rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break yield chunk def transcribe_audio( audio_path : Path, progress_cb : Optional[Callable[[str, int], None]] = None, ) -> dict: """ Transcribe audio with AssemblyAI. Enabled features: - sentiment_analysis : per-sentence POSITIVE / NEGATIVE / NEUTRAL - auto_chapters : topic-based chapter detection - speaker_labels : multi-speaker support Returns ------- { "text" : str, "words" : [{text, start_ms, end_ms, confidence}], "sentences" : [{text, start_ms, end_ms, sentiment}], "chapters" : [{gist, summary, start_ms, end_ms}], "duration_ms": int, } """ api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") if not api_key or not isinstance(api_key, str) or api_key == "": raise RuntimeError("ASSEMBLYAI_API_KEY is not set") log("πŸŽ™οΈ", f"Uploading audio to AssemblyAI: {audio_path.name}") if progress_cb: progress_cb("transcribing", 5) speech_models = _resolve_speech_models() log("🧠", f"AssemblyAI speech_models={speech_models}") timeout_s = int(os.environ.get("ASSEMBLYAI_TIMEOUT_SEC", "1800")) poll_interval_s = float(os.environ.get("ASSEMBLYAI_POLL_INTERVAL_SEC", "3")) headers = {"authorization": api_key} with httpx.Client(timeout=120.0) as client: # Step 1: Upload audio upload_resp = client.post( "https://api.assemblyai.com/v2/upload", headers=headers, content=_iter_file_chunks(audio_path), ) if upload_resp.status_code >= 400: raise RuntimeError(f"AssemblyAI upload failed: {upload_resp.text[:500]}") audio_url = upload_resp.json().get("upload_url") if not audio_url: raise RuntimeError("AssemblyAI upload failed: missing upload_url") if progress_cb: progress_cb("transcribing", 15) # Step 2: Request transcript create_payload = { "audio_url": audio_url, "speech_models": speech_models, "sentiment_analysis": True, "auto_chapters": True, "speaker_labels": True, "punctuate": True, "format_text": True, } create_resp = client.post( "https://api.assemblyai.com/v2/transcript", headers=headers, json=create_payload, ) if create_resp.status_code >= 400: raise RuntimeError(f"AssemblyAI create transcript failed: {create_resp.text[:500]}") transcript_id = create_resp.json().get("id") if not transcript_id: raise RuntimeError("AssemblyAI create transcript failed: missing transcript id") log("⏳", "Transcribing… (1–3 min depending on video length)") # Step 3: Poll elapsed = 0.0 transcript_json = None while elapsed <= timeout_s: poll_resp = client.get( f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers=headers, ) if poll_resp.status_code >= 400: raise RuntimeError(f"AssemblyAI polling failed: {poll_resp.text[:500]}") transcript_json = poll_resp.json() status = transcript_json.get("status") if status == "completed": break if status == "error": raise RuntimeError(f"AssemblyAI error: {transcript_json.get('error', 'Unknown error')}") elapsed += poll_interval_s # Map poll progress roughly into 15..95 if progress_cb: pct = min(95, int(15 + (elapsed / max(timeout_s, 1)) * 80)) progress_cb("transcribing", pct) time.sleep(poll_interval_s) if not transcript_json or transcript_json.get("status") != "completed": raise RuntimeError("AssemblyAI transcription timed out") if progress_cb: progress_cb("transcribing", 100) # ── Package results ────────────────────────────────────────────────────── words = [ { "text" : w.get("text", ""), "start_ms" : w.get("start", 0), "end_ms" : w.get("end", 0), "confidence": w.get("confidence", 0.0), } for w in (transcript_json.get("words") or []) ] sentences = [ { "text" : s.get("text", ""), "start_ms" : s.get("start", 0), "end_ms" : s.get("end", 0), "sentiment": s.get("sentiment", "NEUTRAL"), } for s in (transcript_json.get("sentiment_analysis_results") or []) ] chapters = [ { "gist" : c.get("gist", ""), "summary" : c.get("summary", ""), "start_ms": c.get("start", 0), "end_ms" : c.get("end", 0), } for c in (transcript_json.get("chapters") or []) ] result = { "text" : transcript_json.get("text", ""), "words" : words, "sentences" : sentences, "chapters" : chapters, "duration_ms": int(float(transcript_json.get("audio_duration", 0) or 0) * 1000), } log("βœ…", f"Transcription done β€” {len(words)} words, " f"{len(sentences)} sentences, {len(chapters)} chapters") return result