studiox-reel-cutter / transcriber.py
Rajan18's picture
../
90d824b verified
"""
pipeline/transcriber.py
Uploads audio to AssemblyAI and returns word-level transcript data.
Uses AssemblyAI REST API directly to avoid SDK version incompatibilities
around speech model config fields.
"""
import os
import time
from pathlib import Path
from typing import Callable, List, Optional
import httpx
from utils import log
def _resolve_speech_models() -> List[str]:
"""
Resolve AssemblyAI speech models from env.
Expected env format: ASSEMBLYAI_SPEECH_MODELS="universal-3-pro,universal-2"
"""
allowed = {"universal-3-pro", "universal-2"}
raw = os.environ.get("ASSEMBLYAI_SPEECH_MODELS", "universal-2")
# Remove all whitespace characters from each model name
models = [m.replace(" ", "") for m in raw.split(",") if m.replace(" ", "")]
if not models:
raise RuntimeError(
"ASSEMBLYAI_SPEECH_MODELS is empty. "
"Set it to one or more of: universal-3-pro, universal-2."
)
invalid = [m for m in models if m not in allowed]
if invalid:
raise RuntimeError(
"Invalid ASSEMBLYAI_SPEECH_MODELS value(s): "
f"{', '.join(invalid)}. Allowed values: universal-3-pro, universal-2."
)
return models
def _iter_file_chunks(path: Path, chunk_size: int = 5 * 1024 * 1024):
with path.open("rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def transcribe_audio(
audio_path : Path,
progress_cb : Optional[Callable[[str, int], None]] = None,
) -> dict:
"""
Transcribe audio with AssemblyAI.
Enabled features:
- sentiment_analysis : per-sentence POSITIVE / NEGATIVE / NEUTRAL
- auto_chapters : topic-based chapter detection
- speaker_labels : multi-speaker support
Returns
-------
{
"text" : str,
"words" : [{text, start_ms, end_ms, confidence}],
"sentences" : [{text, start_ms, end_ms, sentiment}],
"chapters" : [{gist, summary, start_ms, end_ms}],
"duration_ms": int,
}
"""
api_key = os.environ.get("ASSEMBLYAI_API_KEY", "")
if not api_key or not isinstance(api_key, str) or api_key == "":
raise RuntimeError("ASSEMBLYAI_API_KEY is not set")
log("πŸŽ™οΈ", f"Uploading audio to AssemblyAI: {audio_path.name}")
if progress_cb:
progress_cb("transcribing", 5)
speech_models = _resolve_speech_models()
log("🧠", f"AssemblyAI speech_models={speech_models}")
timeout_s = int(os.environ.get("ASSEMBLYAI_TIMEOUT_SEC", "1800"))
poll_interval_s = float(os.environ.get("ASSEMBLYAI_POLL_INTERVAL_SEC", "3"))
headers = {"authorization": api_key}
with httpx.Client(timeout=120.0) as client:
# Step 1: Upload audio
upload_resp = client.post(
"https://api.assemblyai.com/v2/upload",
headers=headers,
content=_iter_file_chunks(audio_path),
)
if upload_resp.status_code >= 400:
raise RuntimeError(f"AssemblyAI upload failed: {upload_resp.text[:500]}")
audio_url = upload_resp.json().get("upload_url")
if not audio_url:
raise RuntimeError("AssemblyAI upload failed: missing upload_url")
if progress_cb:
progress_cb("transcribing", 15)
# Step 2: Request transcript
create_payload = {
"audio_url": audio_url,
"speech_models": speech_models,
"sentiment_analysis": True,
"auto_chapters": True,
"speaker_labels": True,
"punctuate": True,
"format_text": True,
}
create_resp = client.post(
"https://api.assemblyai.com/v2/transcript",
headers=headers,
json=create_payload,
)
if create_resp.status_code >= 400:
raise RuntimeError(f"AssemblyAI create transcript failed: {create_resp.text[:500]}")
transcript_id = create_resp.json().get("id")
if not transcript_id:
raise RuntimeError("AssemblyAI create transcript failed: missing transcript id")
log("⏳", "Transcribing… (1–3 min depending on video length)")
# Step 3: Poll
elapsed = 0.0
transcript_json = None
while elapsed <= timeout_s:
poll_resp = client.get(
f"https://api.assemblyai.com/v2/transcript/{transcript_id}",
headers=headers,
)
if poll_resp.status_code >= 400:
raise RuntimeError(f"AssemblyAI polling failed: {poll_resp.text[:500]}")
transcript_json = poll_resp.json()
status = transcript_json.get("status")
if status == "completed":
break
if status == "error":
raise RuntimeError(f"AssemblyAI error: {transcript_json.get('error', 'Unknown error')}")
elapsed += poll_interval_s
# Map poll progress roughly into 15..95
if progress_cb:
pct = min(95, int(15 + (elapsed / max(timeout_s, 1)) * 80))
progress_cb("transcribing", pct)
time.sleep(poll_interval_s)
if not transcript_json or transcript_json.get("status") != "completed":
raise RuntimeError("AssemblyAI transcription timed out")
if progress_cb:
progress_cb("transcribing", 100)
# ── Package results ──────────────────────────────────────────────────────
words = [
{
"text" : w.get("text", ""),
"start_ms" : w.get("start", 0),
"end_ms" : w.get("end", 0),
"confidence": w.get("confidence", 0.0),
}
for w in (transcript_json.get("words") or [])
]
sentences = [
{
"text" : s.get("text", ""),
"start_ms" : s.get("start", 0),
"end_ms" : s.get("end", 0),
"sentiment": s.get("sentiment", "NEUTRAL"),
}
for s in (transcript_json.get("sentiment_analysis_results") or [])
]
chapters = [
{
"gist" : c.get("gist", ""),
"summary" : c.get("summary", ""),
"start_ms": c.get("start", 0),
"end_ms" : c.get("end", 0),
}
for c in (transcript_json.get("chapters") or [])
]
result = {
"text" : transcript_json.get("text", ""),
"words" : words,
"sentences" : sentences,
"chapters" : chapters,
"duration_ms": int(float(transcript_json.get("audio_duration", 0) or 0) * 1000),
}
log("βœ…", f"Transcription done β€” {len(words)} words, "
f"{len(sentences)} sentences, {len(chapters)} chapters")
return result