#!/usr/bin/env python3 """ Ringg Parrot STT V1 π¦ - Hugging Face Space (Frontend) Real-time streaming transcription using Gradio's audio streaming. """ import os import tempfile from pathlib import Path import json import asyncio import time import gradio as gr import requests import numpy as np import soundfile as sf from dotenv import load_dotenv try: import websockets HAS_WEBSOCKETS = True except ImportError: HAS_WEBSOCKETS = False print("β οΈ websockets not installed. Install with: pip install websockets") try: import librosa HAS_LIBROSA = True except ImportError: HAS_LIBROSA = False print("β οΈ librosa not installed. Install with: pip install librosa") load_dotenv() # Backend API endpoint API_ENDPOINT = os.environ.get("STT_API_ENDPOINT", "http://localhost:7864") TARGET_SAMPLE_RATE = 16000 class RinggSTTClient: """Client for Ringg Parrot STT API""" def __init__(self, api_endpoint: str): self.api_endpoint = api_endpoint.rstrip("/") self.session = requests.Session() self.session.headers.update({"User-Agent": "RinggSTT-HF-Space/1.0"}) def check_health(self) -> dict: try: response = self.session.get(f"{self.api_endpoint}/health", timeout=5) if response.status_code == 200: return {"status": "healthy", "message": "β API is online"} return {"status": "error", "message": f"β API returned status {response.status_code}"} except Exception as e: return {"status": "error", "message": f"β Error: {str(e)}"} def transcribe_audio_data(self, audio_data: np.ndarray, sample_rate: int, language: str = "hi") -> str: """Transcribe audio data (numpy array) via multipart upload API""" try: # Save to temporary WAV file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: temp_path = f.name sf.write(temp_path, audio_data, sample_rate) try: with open(temp_path, "rb") as f: files = {"file": ("audio.wav", f, "audio/wav")} data = {"language": language, "punctuate": "false"} response = self.session.post( f"{self.api_endpoint}/v1/audio/transcriptions", files=files, data=data, timeout=30, ) # Debug: log the response for troubleshooting print( f"[transcribe_audio_data] status={response.status_code} " f"body={response.text[:500]}" ) if response.status_code == 200: result = response.json() if "transcription_channel_0" in result: return result.get("transcription_channel_0", "") return result.get("transcription", "") else: return "" finally: os.unlink(temp_path) except Exception as e: print(f"Transcription error: {e}") return "" async def transcribe_websocket_on_final(self, audio_data: np.ndarray, sample_rate: int, language: str = "hi") -> str: """ Transcribe audio via WebSocket on_final endpoint. Sends the ENTIRE audio at once, then requests transcription. """ if not HAS_WEBSOCKETS: return "β websockets library not installed" try: # Convert HTTP endpoint to WebSocket ws_endpoint = self.api_endpoint.replace("http://", "ws://").replace("https://", "wss://") ws_url = f"{ws_endpoint}/v1/audio/stream" # Ensure audio is in proper range [-1, 1] before converting audio_clipped = np.clip(audio_data, -1.0, 1.0) # Convert audio to int16 PCM audio_int16 = (audio_clipped * 32767).astype(np.int16) audio_bytes = audio_int16.tobytes() # Debug info duration_sec = len(audio_int16) / sample_rate print(f"WS on_final: Sending {duration_sec:.2f}s of audio ({len(audio_bytes)} bytes)") print(f"WS: audio_int16 range: min={np.min(audio_int16)}, max={np.max(audio_int16)}") async with websockets.connect(ws_url, max_size=None, ping_timeout=120) as ws: # Step 1: Send start message with on_final mode start_msg = { "type": "start", "prediction_method": "on_final", "sample_rate": sample_rate, "encoding": "int16", "language": "Hindi" if language == "hi" else "English", "api_key": "gradio-client", "punctuate": False } await ws.send(json.dumps(start_msg)) print(f"π€ Sent start message: {start_msg}") # Step 2: Wait for ready response ready_msg = await asyncio.wait_for(ws.recv(), timeout=10) ready_data = json.loads(ready_msg) if ready_data.get("type") != "ready": return f"β Unexpected response: {ready_data}" print(f"β WebSocket ready: {ready_data}") # Step 3: Send ENTIRE audio at once (not chunked) print(f"π€ Sending entire audio ({len(audio_bytes)} bytes / {duration_sec:.2f}s)...") await ws.send(audio_bytes) # Step 4: Wait for chunk acknowledgment ack_msg = await asyncio.wait_for(ws.recv(), timeout=30) ack_data = json.loads(ack_msg) if ack_data.get("type") == "chunk": buffered = ack_data.get('total_buffered', 0) print(f"β Audio buffered: {buffered} samples ({buffered/sample_rate:.2f}s)") # Step 5: Send end signal to trigger transcription end_msg = {"type": "end"} await ws.send(json.dumps(end_msg)) print(f"π€ Sent end message - waiting for transcription...") # Step 6: Receive transcription (longer timeout for processing) transcription = "" result_msg = await asyncio.wait_for(ws.recv(), timeout=120) result_data = json.loads(result_msg) print(f"π₯ Result type: {result_data.get('type')}") if result_data.get("type") == "transcript": transcription = result_data.get("transcription", "") print(f"β Transcription received: '{transcription}'") elif result_data.get("type") == "error": return f"β Error: {result_data.get('detail', 'Unknown error')}" else: print(f"β οΈ Unexpected result: {result_data}") # Step 7: Send stop to end session stop_msg = {"type": "stop"} await ws.send(json.dumps(stop_msg)) return transcription except asyncio.TimeoutError as e: print(f"WebSocket timeout: {e}") return "β Timeout waiting for response from server" except Exception as e: print(f"WebSocket transcription error: {e}") import traceback traceback.print_exc() return f"β WebSocket Error: {str(e)}" def transcribe_file(self, audio_file_path: str, language: str = "hi") -> str: """Transcribe audio file via multipart upload API""" try: with open(audio_file_path, "rb") as f: files = {"file": (Path(audio_file_path).name, f)} data = {"language": language, "punctuate": "false"} response = self.session.post( f"{self.api_endpoint}/v1/audio/transcriptions", files=files, data=data, timeout=120, ) if response.status_code == 200: result = response.json() if "transcription_channel_0" in result: transcripts = [] if result.get("transcription_channel_0"): transcripts.append(result["transcription_channel_0"]) if result.get("transcription_channel_1"): transcripts.append(f"\n[Channel 2]: {result['transcription_channel_1']}") return "".join(transcripts) if transcripts else "No speech detected" return result.get("transcription", "No transcription received") else: return f"β API Error: {response.status_code}" except Exception as e: return f"β Error: {str(e)}" # Initialize API client print(f"π Connecting to STT API: {API_ENDPOINT}") stt_client = RinggSTTClient(API_ENDPOINT) health_status = stt_client.check_health() print(f"API Health: {health_status}") def resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """Resample audio to target sample rate""" if orig_sr == target_sr: return audio if HAS_LIBROSA: return librosa.resample(audio.astype(np.float64), orig_sr=orig_sr, target_sr=target_sr) else: # Simple linear interpolation fallback duration = len(audio) / orig_sr new_length = int(duration * target_sr) indices = np.linspace(0, len(audio) - 1, new_length) return np.interp(indices, np.arange(len(audio)), audio.astype(np.float64)) def process_recorded_audio(audio, language): """ Process the recorded audio after user stops recording. Sends entire audio to Triton backend via WebSocket on_final endpoint. """ if audio is None: return "β οΈ No audio recorded" try: # Gradio non-streaming audio returns (sample_rate, audio_data) if isinstance(audio, tuple) and len(audio) == 2: sample_rate, audio_data = audio else: return "β οΈ Invalid audio format" if not isinstance(audio_data, np.ndarray) or len(audio_data) == 0: return "β οΈ No audio data received" print(f"Raw audio: shape={audio_data.shape}, dtype={audio_data.dtype}, sample_rate={sample_rate}") # Convert to float64 first and normalize to [-1, 1] audio_data = audio_data.astype(np.float64) # If audio is in int16 range, normalize it to [-1, 1] if np.max(np.abs(audio_data)) > 2.0: # Likely int16 data audio_data = audio_data / 32768.0 print(f"Normalized from int16 range") # Convert stereo to mono if needed if len(audio_data.shape) > 1: if audio_data.shape[1] == 2: # (samples, 2) format audio_data = np.mean(audio_data, axis=1) elif audio_data.shape[0] == 2: # (2, samples) format audio_data = np.mean(audio_data, axis=0) print(f"Converted stereo to mono: {audio_data.shape}") # Calculate duration total_samples = len(audio_data) total_duration = total_samples / sample_rate print(f"Processing {total_duration:.1f}s of audio...") # Skip very short audio if total_duration < 0.3: return "β οΈ Recording too short. Please record at least 0.5 seconds." # Resample to 16kHz if needed if sample_rate != TARGET_SAMPLE_RATE: audio_data = resample_audio(audio_data, sample_rate, TARGET_SAMPLE_RATE) sample_rate = TARGET_SAMPLE_RATE # Debug: Check audio data print(f"Audio shape: {audio_data.shape}, dtype: {audio_data.dtype}") print(f"Audio range BEFORE norm: min={np.min(audio_data):.6f}, max={np.max(audio_data):.6f}") # Check RMS level for debugging rms = np.sqrt(np.mean(audio_data ** 2)) print(f"Audio RMS level: {rms:.6f}") # Check if audio is completely silent (all zeros) if np.all(audio_data == 0): return "β οΈ No audio detected. Please check your microphone and try again." # Normalize audio to use full dynamic range max_val = np.max(np.abs(audio_data)) if max_val > 0: audio_data = audio_data / max_val * 0.95 print(f"Audio range AFTER norm: min={np.min(audio_data):.6f}, max={np.max(audio_data):.6f}") # Get language code lang_code = "hi" if language == "Hindi" else "en" # Prepare audio for transcription audio_float32 = audio_data.astype(np.float32) # Transcribe via WebSocket on_final endpoint print("\nπΆ Starting transcription via Triton backend...") start_time = time.time() transcription = asyncio.run( stt_client.transcribe_websocket_on_final( audio_float32, sample_rate, lang_code ) ) elapsed = time.time() - start_time print(f"β Transcription completed in {elapsed:.2f}s") # Return result if transcription and transcription.strip() and not transcription.startswith("β"): return transcription else: return transcription or "β οΈ No speech detected in the recording" except Exception as e: print(f"Processing error: {e}") import traceback traceback.print_exc() return f"β Error processing audio: {str(e)}" def clear_transcription(): """Clear transcription output""" return None, "π€ Click microphone to record..." def transcribe_file(audio_file, language): """Transcribe uploaded audio file""" if audio_file is None: return "β οΈ Please upload an audio file to transcribe." lang_code = "hi" if language == "Hindi" else "en" transcription = stt_client.transcribe_file(audio_file, lang_code) text = (transcription or "").strip() if not text or text.startswith("β") or text.startswith("β±"): return text or "β οΈ No speech detectedβtry a clearer recording." return text def create_interface(): """Create Gradio interface""" with gr.Blocks( theme=gr.themes.Base( font=[gr.themes.GoogleFont("Source Sans Pro"), "Arial", "sans-serif"] ), css=".gradio-container {max-width: none !important;}", ) as demo: gr.HTML("""