#!/usr/bin/env python3 """ Ringg Parrot STT V1 🦜 - Hugging Face Space (Frontend) Real-time streaming transcription using Gradio's audio streaming. """ import os import tempfile from pathlib import Path import json import asyncio import time import gradio as gr import requests import numpy as np import soundfile as sf from dotenv import load_dotenv try: import websockets HAS_WEBSOCKETS = True except ImportError: HAS_WEBSOCKETS = False print("⚠️ websockets not installed. Install with: pip install websockets") try: import librosa HAS_LIBROSA = True except ImportError: HAS_LIBROSA = False print("⚠️ librosa not installed. Install with: pip install librosa") load_dotenv() # Backend API endpoint API_ENDPOINT = os.environ.get("STT_API_ENDPOINT", "http://localhost:7864") TARGET_SAMPLE_RATE = 16000 class RinggSTTClient: """Client for Ringg Parrot STT API""" def __init__(self, api_endpoint: str): self.api_endpoint = api_endpoint.rstrip("/") self.session = requests.Session() self.session.headers.update({"User-Agent": "RinggSTT-HF-Space/1.0"}) def check_health(self) -> dict: try: response = self.session.get(f"{self.api_endpoint}/health", timeout=5) if response.status_code == 200: return {"status": "healthy", "message": "βœ… API is online"} return {"status": "error", "message": f"❌ API returned status {response.status_code}"} except Exception as e: return {"status": "error", "message": f"❌ Error: {str(e)}"} def transcribe_audio_data(self, audio_data: np.ndarray, sample_rate: int, language: str = "hi") -> str: """Transcribe audio data (numpy array) via multipart upload API""" try: # Save to temporary WAV file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: temp_path = f.name sf.write(temp_path, audio_data, sample_rate) try: with open(temp_path, "rb") as f: files = {"file": ("audio.wav", f, "audio/wav")} data = {"language": language, "punctuate": "false"} response = self.session.post( f"{self.api_endpoint}/v1/audio/transcriptions", files=files, data=data, timeout=30, ) # Debug: log the response for troubleshooting print( f"[transcribe_audio_data] status={response.status_code} " f"body={response.text[:500]}" ) if response.status_code == 200: result = response.json() if "transcription_channel_0" in result: return result.get("transcription_channel_0", "") return result.get("transcription", "") else: return "" finally: os.unlink(temp_path) except Exception as e: print(f"Transcription error: {e}") return "" async def transcribe_websocket_on_final(self, audio_data: np.ndarray, sample_rate: int, language: str = "hi") -> str: """ Transcribe audio via WebSocket on_final endpoint. Sends the ENTIRE audio at once, then requests transcription. """ if not HAS_WEBSOCKETS: return "❌ websockets library not installed" try: # Convert HTTP endpoint to WebSocket ws_endpoint = self.api_endpoint.replace("http://", "ws://").replace("https://", "wss://") ws_url = f"{ws_endpoint}/v1/audio/stream" # Ensure audio is in proper range [-1, 1] before converting audio_clipped = np.clip(audio_data, -1.0, 1.0) # Convert audio to int16 PCM audio_int16 = (audio_clipped * 32767).astype(np.int16) audio_bytes = audio_int16.tobytes() # Debug info duration_sec = len(audio_int16) / sample_rate print(f"WS on_final: Sending {duration_sec:.2f}s of audio ({len(audio_bytes)} bytes)") print(f"WS: audio_int16 range: min={np.min(audio_int16)}, max={np.max(audio_int16)}") async with websockets.connect(ws_url, max_size=None, ping_timeout=120) as ws: # Step 1: Send start message with on_final mode start_msg = { "type": "start", "prediction_method": "on_final", "sample_rate": sample_rate, "encoding": "int16", "language": "Hindi" if language == "hi" else "English", "api_key": "gradio-client", "punctuate": False } await ws.send(json.dumps(start_msg)) print(f"πŸ“€ Sent start message: {start_msg}") # Step 2: Wait for ready response ready_msg = await asyncio.wait_for(ws.recv(), timeout=10) ready_data = json.loads(ready_msg) if ready_data.get("type") != "ready": return f"❌ Unexpected response: {ready_data}" print(f"βœ… WebSocket ready: {ready_data}") # Step 3: Send ENTIRE audio at once (not chunked) print(f"πŸ“€ Sending entire audio ({len(audio_bytes)} bytes / {duration_sec:.2f}s)...") await ws.send(audio_bytes) # Step 4: Wait for chunk acknowledgment ack_msg = await asyncio.wait_for(ws.recv(), timeout=30) ack_data = json.loads(ack_msg) if ack_data.get("type") == "chunk": buffered = ack_data.get('total_buffered', 0) print(f"βœ… Audio buffered: {buffered} samples ({buffered/sample_rate:.2f}s)") # Step 5: Send end signal to trigger transcription end_msg = {"type": "end"} await ws.send(json.dumps(end_msg)) print(f"πŸ“€ Sent end message - waiting for transcription...") # Step 6: Receive transcription (longer timeout for processing) transcription = "" result_msg = await asyncio.wait_for(ws.recv(), timeout=120) result_data = json.loads(result_msg) print(f"πŸ“₯ Result type: {result_data.get('type')}") if result_data.get("type") == "transcript": transcription = result_data.get("transcription", "") print(f"βœ… Transcription received: '{transcription}'") elif result_data.get("type") == "error": return f"❌ Error: {result_data.get('detail', 'Unknown error')}" else: print(f"⚠️ Unexpected result: {result_data}") # Step 7: Send stop to end session stop_msg = {"type": "stop"} await ws.send(json.dumps(stop_msg)) return transcription except asyncio.TimeoutError as e: print(f"WebSocket timeout: {e}") return "❌ Timeout waiting for response from server" except Exception as e: print(f"WebSocket transcription error: {e}") import traceback traceback.print_exc() return f"❌ WebSocket Error: {str(e)}" def transcribe_file(self, audio_file_path: str, language: str = "hi") -> str: """Transcribe audio file via multipart upload API""" try: with open(audio_file_path, "rb") as f: files = {"file": (Path(audio_file_path).name, f)} data = {"language": language, "punctuate": "false"} response = self.session.post( f"{self.api_endpoint}/v1/audio/transcriptions", files=files, data=data, timeout=120, ) if response.status_code == 200: result = response.json() if "transcription_channel_0" in result: transcripts = [] if result.get("transcription_channel_0"): transcripts.append(result["transcription_channel_0"]) if result.get("transcription_channel_1"): transcripts.append(f"\n[Channel 2]: {result['transcription_channel_1']}") return "".join(transcripts) if transcripts else "No speech detected" return result.get("transcription", "No transcription received") else: return f"❌ API Error: {response.status_code}" except Exception as e: return f"❌ Error: {str(e)}" # Initialize API client print(f"πŸ”— Connecting to STT API: {API_ENDPOINT}") stt_client = RinggSTTClient(API_ENDPOINT) health_status = stt_client.check_health() print(f"API Health: {health_status}") def resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """Resample audio to target sample rate""" if orig_sr == target_sr: return audio if HAS_LIBROSA: return librosa.resample(audio.astype(np.float64), orig_sr=orig_sr, target_sr=target_sr) else: # Simple linear interpolation fallback duration = len(audio) / orig_sr new_length = int(duration * target_sr) indices = np.linspace(0, len(audio) - 1, new_length) return np.interp(indices, np.arange(len(audio)), audio.astype(np.float64)) def process_recorded_audio(audio, language): """ Process the recorded audio after user stops recording. Sends entire audio to Triton backend via WebSocket on_final endpoint. """ if audio is None: return "⚠️ No audio recorded" try: # Gradio non-streaming audio returns (sample_rate, audio_data) if isinstance(audio, tuple) and len(audio) == 2: sample_rate, audio_data = audio else: return "⚠️ Invalid audio format" if not isinstance(audio_data, np.ndarray) or len(audio_data) == 0: return "⚠️ No audio data received" print(f"Raw audio: shape={audio_data.shape}, dtype={audio_data.dtype}, sample_rate={sample_rate}") # Convert to float64 first and normalize to [-1, 1] audio_data = audio_data.astype(np.float64) # If audio is in int16 range, normalize it to [-1, 1] if np.max(np.abs(audio_data)) > 2.0: # Likely int16 data audio_data = audio_data / 32768.0 print(f"Normalized from int16 range") # Convert stereo to mono if needed if len(audio_data.shape) > 1: if audio_data.shape[1] == 2: # (samples, 2) format audio_data = np.mean(audio_data, axis=1) elif audio_data.shape[0] == 2: # (2, samples) format audio_data = np.mean(audio_data, axis=0) print(f"Converted stereo to mono: {audio_data.shape}") # Calculate duration total_samples = len(audio_data) total_duration = total_samples / sample_rate print(f"Processing {total_duration:.1f}s of audio...") # Skip very short audio if total_duration < 0.3: return "⚠️ Recording too short. Please record at least 0.5 seconds." # Resample to 16kHz if needed if sample_rate != TARGET_SAMPLE_RATE: audio_data = resample_audio(audio_data, sample_rate, TARGET_SAMPLE_RATE) sample_rate = TARGET_SAMPLE_RATE # Debug: Check audio data print(f"Audio shape: {audio_data.shape}, dtype: {audio_data.dtype}") print(f"Audio range BEFORE norm: min={np.min(audio_data):.6f}, max={np.max(audio_data):.6f}") # Check RMS level for debugging rms = np.sqrt(np.mean(audio_data ** 2)) print(f"Audio RMS level: {rms:.6f}") # Check if audio is completely silent (all zeros) if np.all(audio_data == 0): return "⚠️ No audio detected. Please check your microphone and try again." # Normalize audio to use full dynamic range max_val = np.max(np.abs(audio_data)) if max_val > 0: audio_data = audio_data / max_val * 0.95 print(f"Audio range AFTER norm: min={np.min(audio_data):.6f}, max={np.max(audio_data):.6f}") # Get language code lang_code = "hi" if language == "Hindi" else "en" # Prepare audio for transcription audio_float32 = audio_data.astype(np.float32) # Transcribe via WebSocket on_final endpoint print("\nπŸ”Ά Starting transcription via Triton backend...") start_time = time.time() transcription = asyncio.run( stt_client.transcribe_websocket_on_final( audio_float32, sample_rate, lang_code ) ) elapsed = time.time() - start_time print(f"βœ… Transcription completed in {elapsed:.2f}s") # Return result if transcription and transcription.strip() and not transcription.startswith("❌"): return transcription else: return transcription or "⚠️ No speech detected in the recording" except Exception as e: print(f"Processing error: {e}") import traceback traceback.print_exc() return f"❌ Error processing audio: {str(e)}" def clear_transcription(): """Clear transcription output""" return None, "🎀 Click microphone to record..." def transcribe_file(audio_file, language): """Transcribe uploaded audio file""" if audio_file is None: return "⚠️ Please upload an audio file to transcribe." lang_code = "hi" if language == "Hindi" else "en" transcription = stt_client.transcribe_file(audio_file, lang_code) text = (transcription or "").strip() if not text or text.startswith("❌") or text.startswith("⏱"): return text or "⚠️ No speech detectedβ€”try a clearer recording." return text def create_interface(): """Create Gradio interface""" with gr.Blocks( theme=gr.themes.Base( font=[gr.themes.GoogleFont("Source Sans Pro"), "Arial", "sans-serif"] ), css=".gradio-container {max-width: none !important;}", ) as demo: gr.HTML("""
Logo

Ringg Parrot STT V1.0 🦜

""") # Microphone recording section gr.Markdown(""" ## 🎀 Record & Transcribe **Instructions:** 1. **Allow microphone access** when your browser prompts you 2. Click the **microphone icon** in the audio component below 3. **Speak** your message 4. Click **Stop** when finished 5. Click **Transcribe** to get your transcription """) with gr.Row(): with gr.Column(scale=1): record_language = gr.Dropdown( choices=["Hindi", "English"], value="Hindi", label="Language", ) audio_input = gr.Audio( sources=["microphone"], type="numpy", label="🎀 Click to record audio", format="wav", show_download_button=False, ) with gr.Row(): transcribe_btn = gr.Button("πŸ“ Transcribe", variant="primary", size="lg") clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary") with gr.Column(scale=2): text_output = gr.Textbox( label="Transcription", value="🎀 Click microphone to record...", lines=10, interactive=False, ) # Transcribe button - processes the recorded audio transcribe_btn.click( fn=process_recorded_audio, inputs=[audio_input, record_language], outputs=[text_output], ) # Clear button clear_btn.click( fn=clear_transcription, inputs=[], outputs=[audio_input, text_output], ) gr.Markdown("
") # File upload section gr.Markdown(""" ## πŸ“ Upload an audio file for transcription Supports WAV, MP3, FLAC, M4A, and more. """) with gr.Row(): with gr.Column(scale=1): file_language = gr.Dropdown( choices=["Hindi", "English"], value="Hindi", label="Language", ) file_input = gr.Audio( type="filepath", sources=["upload"], label="Upload Audio", ) transcribe_btn = gr.Button("Transcribe File", variant="primary", size="lg") with gr.Column(scale=2): file_output = gr.Textbox( label="Transcription", lines=8, interactive=False, ) transcribe_btn.click( fn=transcribe_file, inputs=[file_input, file_language], outputs=file_output, ) gr.Markdown("""
## 🎯 Performance Benchmarks **Ringg Parrot STT V1** Ranks **1st** Among Top Models. """) with gr.Row(): gr.DataFrame( value=[ ["Parrot STT (Ringg AI)", "15.00%", "15.92%"], ["IndicWav2Vec ", "19.35%", "20.91%"], ["VakyanSh Wav2Vec2", "22.73%", "24.78%"], ], headers=["Model", "Median WER ↓", "Mean WER ↓"], datatype=["str", "str", "str"], row_count=3, col_count=(3, "fixed"), interactive=False, ) gr.Markdown(""" ## πŸ“Š Benchmarking Strategy Our model was trained on approximately **3.5k hours** of diverse, multi-domain Hindi speech data to ensure robust performance across various acoustic conditions and use cases. ### Data Preprocessing & Sanity Prior to training and evaluation, all transcript text was processed through [**AI4Bharat's Cadence**](https://huggingface.co/ai4bharat/Cadence) punctuation restoration model. Cadence is a state-of-the-art multilingual punctuation model based on Gemma-3-1B that supports English and 22 Indic languages. This preprocessing step ensured: - Consistent punctuation across training data - Normalized text formatting for better model convergence - Data sanity and quality assurance - Improved downstream ASR performance ### Training Data Composition: - **40% Telephony Data** - Call center conversations, customer support calls, and telephonic interactions - **30% Graamvani Data** - Rural and grassroots community voice recordings - **30% Other Sources** - Including: - HuggingFace Shrutilipi dataset - Additional Graamvani samples - Internet-sourced narration, conversations, and text readings - TTS (Text-to-Speech) model-generated data ### Why This Strategy? ASR models are particularly susceptible to two critical phenomena: 1. **Accent Deafening** - Performance degradation when encountering accents not represented in training data 2. **Frequency Deafening** - Reduced accuracy on audio with sampling rates or frequency characteristics different from training conditions By combining multiple data sources spanning diverse domains, accents, recording conditions, and sampling rates, we build domain-invariant models that generalize better to real-world variability. As demonstrated in [Narayanan et al. (2018)](https://arxiv.org/pdf/1808.05312), training on large-scale multi-domain data enables models to achieve robustness comparable to domain-specific models while maintaining superior generalization to unseen conditions. Our training data specifically includes: - **Internet Data** (narration, conversation, people reading text, telephony samples) - **TTS Model Data** (synthetic speech for augmentation) - **Telephony Data** (real-world call recordings with varied codecs and noise) This multi-domain approach ensures Parrot STT performs reliably across call centers, voice assistants, mobile apps, and other telephonic/streaming applications. ## πŸ™ Acknowledgements - Built with [NVIDIA NeMo](https://github.com/NVIDIA/NeMo) models - Research inspired by [Domain-Invariant Speech Recognition via Large Scale Training](https://arxiv.org/pdf/1808.05312) """) return demo if __name__ == "__main__": print("🌐 Launching Ringg Parrot STT V1 Gradio Interface...") print(f"Backend API: {API_ENDPOINT}") demo = create_interface() demo.queue(default_concurrency_limit=2, max_size=20) demo.launch( share=False, server_name="0.0.0.0", server_port=7860, debug=True, show_api=False, )