rakib72642's picture
adjusted silence and db settings
089db7b
"""
services/stt.py β€” GPU-Batched Faster-Whisper STT Pipeline
Architecture:
─────────────
β€’ Single shared WhisperModel instance (loaded once, never reloaded)
β€’ asyncio.Queue-based request intake β€” fully non-blocking
β€’ Micro-batching worker: accumulates requests over BATCH_WINDOW_MS,
then runs a single GPU forward pass for the entire batch
β€’ Each caller awaits its own asyncio.Future β€” zero polling overhead
β€’ ffmpeg audio conversion runs in a ThreadPoolExecutor (I/O bound)
β€’ GPU inference runs in a dedicated single-thread Executor (serialize GPU)
β€’ Bangla-optimised decode parameters preserved from original
FIX-BUG4 (race condition + deprecated API):
_STTBatchWorker now uses asyncio.Lock to safely initialise the worker
exactly once, even when multiple coroutines call enqueue() concurrently.
asyncio.get_event_loop() β†’ asyncio.get_running_loop().
FIX-BUG6 (blocking wait without timeout):
asyncio.to_thread(_model_ready.wait) now passes timeout=60 and raises
RuntimeError if the model fails to load in time.
Latency profile:
ffmpeg (parallel) ~30–80 ms
batch wait window ~30 ms (reduced from 50ms)
GPU inference ~80–150 ms per batch (amortised across requests)
Total perceived < 200 ms at moderate load
"""
from __future__ import annotations
import asyncio
import io
import os
import re
import struct
import subprocess
import tempfile
import threading
import wave
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Optional
import requests
from faster_whisper import WhisperModel
# ── Bangla script patterns ─────────────────────────────────────────────────────
_BANGLA_RE = re.compile(r"[\u0980-\u09FF]")
_WRONG_SCRIPT_RE = re.compile(
r"[\u0600-\u06FF\u0750-\u077F\uFB50-\uFDFF\uFE70-\uFEFF]"
)
# ── Configuration ──────────────────────────────────────────────────────────────
USE_ELEVENLABS_STT = True # True = ElevenLabs Scribe, False = Whisper
DROP_NOISE_TRANSCRIPTS = os.getenv("STT_DROP_NOISE_TRANSCRIPTS", "1").strip() not in ("0", "false", "False")
_STT_MODEL = os.getenv("STT_MODEL", "large-v3")
_COMPUTE_TYPE = os.getenv("STT_COMPUTE_TYPE", "int8_float32")
_BATCH_WINDOW = float(os.getenv("STT_BATCH_WINDOW_MS", "30")) / 1000 # 30ms (was 50ms)
_MAX_BATCH = int(os.getenv("STT_MAX_BATCH", "8"))
_MODEL_LOAD_TIMEOUT = int(os.getenv("STT_MODEL_LOAD_TIMEOUT_S", "120")) # seconds
MAX_INPUT_BYTES = 5_242_880 # 5 MB
ELEVENLABS_STT_MODEL_ID = os.getenv("ELEVENLABS_STT_MODEL_ID", "scribe_v2")
ELEVENLABS_STT_LANGUAGE = os.getenv("ELEVENLABS_STT_LANGUAGE", "bn")
ELEVENLABS_STT_TIMEOUT = float(os.getenv("ELEVENLABS_STT_TIMEOUT", "60"))
# ── Singleton model state ──────────────────────────────────────────────────────
_model: Optional[WhisperModel] = None
_model_lock = threading.Lock()
_model_ready = threading.Event()
_model_error: Optional[str] = None
# Two executors: one for ffmpeg (I/O, can be parallel), one for GPU (serial)
_ffmpeg_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ffmpeg")
_gpu_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="whisper-gpu")
# ── Model loader (background thread) ──────────────────────────────────────────
def _load_and_warm() -> None:
global _model, _model_error
try:
print(f"[STT] Loading Faster-Whisper {_STT_MODEL} on CUDA ({_COMPUTE_TYPE})…")
m = WhisperModel(
_STT_MODEL,
device="cuda",
compute_type=_COMPUTE_TYPE,
num_workers=1,
)
# GPU warmup β€” forces CUDA kernel compilation
silence = _make_silence_wav(0.5)
list(m.transcribe(silence, language="bn", beam_size=1)[0])
print("[STT] GPU warmup complete. STT ready βœ“")
with _model_lock:
_model = m
except Exception as exc:
_model_error = str(exc)
print(f"[STT] Model load FAILED: {exc}")
finally:
_model_ready.set()
def _make_silence_wav(duration_s: float = 0.5, sr: int = 16_000) -> io.BytesIO:
buf = io.BytesIO()
n = int(sr * duration_s)
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sr)
wf.writeframes(struct.pack(f"<{n}h", *([0] * n)))
buf.seek(0)
return buf
if not USE_ELEVENLABS_STT:
# Start background model load immediately at import
threading.Thread(target=_load_and_warm, daemon=True, name="whisper-loader").start()
else:
print("[STT] ElevenLabs STT enabled; Whisper model load skipped")
# ── ffmpeg conversion (sync, runs in _ffmpeg_pool) ────────────────────────────
def _to_wav_sync(audio_bytes: bytes) -> Optional[str]:
"""Convert WebM/Opus β†’ 16 kHz mono WAV. Returns temp file path or None."""
in_path = out_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as f:
f.write(audio_bytes)
in_path = f.name
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
out_path = f.name
result = subprocess.run(
[
"ffmpeg", "-y", "-loglevel", "warning",
"-i", in_path,
"-ar", "16000", "-ac", "1",
"-af", "highpass=f=80,afftdn=nf=-25,aresample=resampler=swr",
"-f", "wav", out_path,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
timeout=30,
)
if result.returncode != 0:
print("[STT][ffmpeg]", result.stderr.decode(errors="replace")[:200])
return None
if not os.path.exists(out_path) or os.path.getsize(out_path) < 500:
return None
return out_path
except subprocess.TimeoutExpired:
print("[STT][ffmpeg] timed out")
return None
except Exception as exc:
print(f"[STT][ffmpeg] {exc}")
return None
finally:
if in_path and os.path.exists(in_path):
try:
os.remove(in_path)
except OSError:
pass
# ── Whisper inference (sync, runs in _gpu_pool β€” ONE AT A TIME) ───────────────
def _transcribe_batch_sync(wav_paths: list[str]) -> list[Optional[str]]:
"""
Run Whisper inference on a list of WAV paths.
Returns a list of transcripts (None on error/empty).
"""
with _model_lock:
model = _model
if model is None:
return [None] * len(wav_paths)
results: list[Optional[str]] = []
for path in wav_paths:
try:
segments, info = model.transcribe(
path,
language="bn",
beam_size=5,
vad_filter=False,
condition_on_previous_text=False,
temperature=0,
suppress_tokens=[-1],
no_speech_threshold=0.6,
log_prob_threshold=-0.5,
compression_ratio_threshold=2.4,
)
text = " ".join(seg.text.strip() for seg in segments).strip()
print(f"[STT] lang={info.language} p={info.language_probability:.2f} β†’ {text[:60]}")
results.append(text or None)
except Exception as exc:
print(f"[STT] inference error: {exc}")
results.append(None)
finally:
try:
os.remove(path)
except OSError:
pass
return results
def _transcribe_elevenlabs_sync(wav_path: str) -> Optional[str]:
"""
ElevenLabs Scribe transcription using the REST API.
Runs in a thread so the async pipeline stays non-blocking.
"""
api_key = os.getenv("ELEVENLABS_API_KEY", "").strip()
if not api_key:
raise RuntimeError("[STT][ElevenLabs] ELEVENLABS_API_KEY missing")
url = "https://api.elevenlabs.io/v1/speech-to-text"
headers = {"xi-api-key": api_key}
data = {
"model_id": ELEVENLABS_STT_MODEL_ID,
"language_code": ELEVENLABS_STT_LANGUAGE,
}
with open(wav_path, "rb") as f:
files = {"file": f}
resp = requests.post(
url,
headers=headers,
data=data,
files=files,
timeout=ELEVENLABS_STT_TIMEOUT,
)
if not resp.ok:
raise RuntimeError(f"[STT][ElevenLabs] HTTP {resp.status_code}: {resp.text[:200]}")
payload = resp.json()
text = (payload.get("text") or "").strip()
lang = payload.get("language_code", "?")
prob = payload.get("language_probability", 0)
print(f"[STT][ElevenLabs] lang={lang} p={prob} β†’ {text[:60]}")
return text or None
# ── Hallucination / script validation ─────────────────────────────────────────
_NOISE_TOKENS = {
"silence",
"[silence]",
"noise",
"[noise]",
"[background noise]",
"[background]",
"[music]",
"music",
"[ringing]",
"ringing",
"[phone ringing]",
"phone ringing",
"[clicking]",
"clicking",
"[breathing]",
"breathing",
"[inaudible]",
"inaudible",
"[crosstalk]",
"crosstalk",
}
_NOISE_BRACKET_RE = re.compile(r"^\[([^\]]+)\]$")
_HAS_LETTER_OR_DIGIT_RE = re.compile(r"[0-9A-Za-z\u0980-\u09FF]")
def _is_noise_transcript(text: str) -> bool:
"""
Returns True when the transcript appears to be a non-user utterance such as
silence/background noise labels.
"""
raw = (text or "").strip()
if not raw:
return True
low = raw.lower()
if low in _NOISE_TOKENS:
return True
m = _NOISE_BRACKET_RE.match(raw)
if m:
inner = m.group(1).strip().lower()
if inner in _NOISE_TOKENS:
return True
if any(k in inner for k in ("silence", "noise", "music", "ring", "click", "inaudible", "breath")):
return True
# Pure punctuation / symbols ("...", "β€”", etc.) should not trigger LLM turns.
if not _HAS_LETTER_OR_DIGIT_RE.search(raw):
return True
return False
def _validate(text: str) -> Optional[str]:
if not text or not text.strip():
return None
text = text.strip()
if DROP_NOISE_TRANSCRIPTS and _is_noise_transcript(text):
print(f"[STT] dropped noise/silence: {text[:60]}")
return None
words = text.split()
if len(words) >= 6 and len(set(words)) / len(words) < 0.25:
print(f"[STT] rejected repetition: {text[:60]}")
return None
if len(words) == 2 and words[0] == words[1]:
return None
# Catch repeated-loop hallucinations like "আΰ¦ͺনার ΰ¦Έΰ¦Ύΰ¦₯ে ..." repeated many times.
for phrase_len in (2, 3, 4):
if len(words) >= phrase_len * 3:
phrase = words[:phrase_len]
if all(words[i:i + phrase_len] == phrase for i in range(0, phrase_len * 3, phrase_len)):
print(f"[STT] rejected looped phrase: {text[:60]}")
return None
# Soft script check β€” log but keep
wrong = len(_WRONG_SCRIPT_RE.findall(text))
alpha = sum(1 for c in text if c.isalpha())
if alpha > 0 and wrong / alpha > 0.30:
print(f"[STT] non-Bangla (kept): {text[:60]}")
return text
# ══════════════════════════════════════════════════════════════════════════════
# BATCH QUEUE + WORKER
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class _STTRequest:
wav_path: str
future: asyncio.Future = field(default_factory=asyncio.Future)
class _STTBatchWorker:
"""
Singleton async worker that:
1. Accepts STT requests from any coroutine via enqueue()
2. Collects requests for up to BATCH_WINDOW_MS
3. Dispatches the batch to _gpu_pool in one call
4. Resolves each caller's Future
FIX-BUG4 (race condition):
Uses asyncio.Lock to guarantee the worker task is created exactly once,
even when multiple coroutines call enqueue() before the task starts.
FIX-BUG4 (deprecated API):
Uses asyncio.get_running_loop() instead of asyncio.get_event_loop().
"""
def __init__(self) -> None:
self._queue: asyncio.Queue[_STTRequest] = asyncio.Queue()
self._started: bool = False
self._start_lock: Optional[asyncio.Lock] = None # created on first use
def _get_lock(self) -> asyncio.Lock:
# asyncio.Lock must be created inside the running event loop
if self._start_lock is None:
self._start_lock = asyncio.Lock()
return self._start_lock
async def _ensure_started(self) -> None:
async with self._get_lock():
if not self._started:
self._started = True
asyncio.ensure_future(self._worker_loop())
async def enqueue(self, wav_path: str) -> Optional[str]:
await self._ensure_started()
# FIX-BUG4: get_running_loop() is the correct modern API
loop = asyncio.get_running_loop()
req = _STTRequest(wav_path=wav_path, future=loop.create_future())
await self._queue.put(req)
return await req.future
async def _worker_loop(self) -> None:
loop = asyncio.get_running_loop()
while True:
# Wait for at least one request
first = await self._queue.get()
batch = [first]
# Micro-batch window: collect more requests arriving within BATCH_WINDOW
try:
deadline = loop.time() + _BATCH_WINDOW
while len(batch) < _MAX_BATCH:
remaining = deadline - loop.time()
if remaining <= 0:
break
req = await asyncio.wait_for(self._queue.get(), timeout=remaining)
batch.append(req)
except asyncio.TimeoutError:
pass
# Dispatch batch to GPU executor
wav_paths = [r.wav_path for r in batch]
print(f"[STT] Dispatching batch of {len(batch)} to GPU…")
try:
results = await loop.run_in_executor(
_gpu_pool, _transcribe_batch_sync, wav_paths
)
except Exception as exc:
results = [None] * len(batch)
print(f"[STT] Batch GPU error: {exc}")
# Resolve futures
for req, text in zip(batch, results):
if not req.future.done():
req.future.set_result(text)
_batch_worker = _STTBatchWorker()
# ══════════════════════════════════════════════════════════════════════════════
# PUBLIC API
# ══════════════════════════════════════════════════════════════════════════════
class STTProcessor:
"""
Drop-in replacement for the original STTProcessor.
Routes through the GPU batch worker for shared inference.
"""
async def transcribe(self, audio_bytes: bytes) -> Optional[str]:
"""Full pipeline: validate β†’ ffmpeg (parallel) β†’ Whisper or ElevenLabs STT."""
if not audio_bytes or len(audio_bytes) < 300:
print(f"[STT] Ignored tiny packet ({len(audio_bytes)} B)")
return None
if len(audio_bytes) > MAX_INPUT_BYTES:
audio_bytes = audio_bytes[:MAX_INPUT_BYTES]
# ffmpeg: runs in parallel I/O pool (not serialised)
loop = asyncio.get_running_loop()
wav_path = await loop.run_in_executor(_ffmpeg_pool, _to_wav_sync, audio_bytes)
if not wav_path:
return None
if USE_ELEVENLABS_STT:
try:
text = await loop.run_in_executor(_ffmpeg_pool, _transcribe_elevenlabs_sync, wav_path)
return _validate(text) if text else None
finally:
if os.path.exists(wav_path):
try:
os.remove(wav_path)
except OSError:
pass
# Whisper path: wait for model with timeout β€” not forever
if not _model_ready.is_set():
print("[STT] Waiting for model to load…")
ready = await asyncio.to_thread(_model_ready.wait, _MODEL_LOAD_TIMEOUT)
if not ready:
raise RuntimeError(
f"[STT] Whisper model did not load within {_MODEL_LOAD_TIMEOUT}s"
)
if _model_error:
raise RuntimeError(f"[STT] Whisper model failed to load: {_model_error}")
# Batch GPU inference
text = await _batch_worker.enqueue(wav_path)
return _validate(text) if text else None