| import asyncio
|
| import websockets
|
| import json
|
| import threading
|
| import numpy as np
|
| import logging
|
| import time
|
| import tempfile
|
| import os
|
| import re
|
| from concurrent.futures import ThreadPoolExecutor
|
| import nemo.collections.asr as nemo_asr
|
| import soundfile as sf
|
|
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
| arabic_numbers = {
|
|
|
| "صفر": "0", "زيرو": "0", "٠": "0","زيو": "0","زير": "0",
|
| "واحد": "1", "واحدة": "1", "١": "1",
|
| "اتنين": "2", "اثنين": "2", "إثنين": "2", "اثنان": "2", "إثنان": "2", "٢": "2",
|
| "تلاتة": "3", "ثلاثة": "3", "٣": "3",
|
| "اربعة": "4", "أربعة": "4", "٤": "4",
|
| "خمسة": "5", "٥": "5",
|
| "ستة": "6", "٦": "6",
|
| "سبعة": "7", "٧": "7",
|
| "تمانية": "8", "ثمانية": "8", "٨": "8",
|
| "تسعة": "9", "٩": "9",
|
|
|
|
|
| "عشرة": "10", "١٠": "10",
|
| "حداشر": "11", "احد عشر": "11","احداشر": "11",
|
| "اتناشر": "12", "اثنا عشر": "12",
|
| "تلتاشر": "13", "ثلاثة عشر": "13",
|
| "اربعتاشر": "14", "أربعة عشر": "14",
|
| "خمستاشر": "15", "خمسة عشر": "15",
|
| "ستاشر": "16", "ستة عشر": "16",
|
| "سبعتاشر": "17", "سبعة عشر": "17",
|
| "طمنتاشر": "18", "ثمانية عشر": "18",
|
| "تسعتاشر": "19", "تسعة عشر": "19",
|
|
|
|
|
| "عشرين": "20", "٢٠": "20",
|
| "تلاتين": "30", "ثلاثين": "30", "٣٠": "30",
|
| "اربعين": "40", "أربعين": "40", "٤٠": "40",
|
| "خمسين": "50", "٥٠": "50",
|
| "ستين": "60", "٦٠": "60",
|
| "سبعين": "70", "٧٠": "70",
|
| "تمانين": "80", "ثمانين": "80", "٨٠": "80","تمانون": "80","ثمانون": "80",
|
| "تسعين": "90", "٩٠": "90",
|
|
|
|
|
| "مية": "100", "مائة": "100", "مئة": "100", "١٠٠": "100",
|
| "ميتين": "200", "مائتين": "200",
|
| "تلاتمية": "300", "ثلاثمائة": "300",
|
| "اربعمية": "400", "أربعمائة": "400",
|
| "خمسمية": "500", "خمسمائة": "500",
|
| "ستمية": "600", "ستمائة": "600",
|
| "سبعمية": "700", "سبعمائة": "700",
|
| "تمانمية": "800", "ثمانمائة": "800",
|
| "تسعمية": "900", "تسعمائة": "900",
|
|
|
|
|
| "ألف": "1000", "الف": "1000", "١٠٠٠": "1000",
|
| "ألفين": "2000", "الفين": "2000",
|
| "تلات تلاف": "3000", "ثلاثة آلاف": "3000",
|
| "اربعة آلاف": "4000", "أربعة آلاف": "4000",
|
| "خمسة آلاف": "5000",
|
| "ستة آلاف": "6000",
|
| "سبعة آلاف": "7000",
|
| "تمانية آلاف": "8000", "ثمانية آلاف": "8000",
|
| "تسعة آلاف": "9000",
|
|
|
|
|
| "عشرة آلاف": "10000",
|
| "مية ألف": "100000", "مائة ألف": "100000",
|
| "مليون": "1000000", "١٠٠٠٠٠٠": "1000000",
|
| "ملايين": "1000000",
|
| "مليار": "1000000000", "١٠٠٠٠٠٠٠٠٠": "1000000000"
|
| }
|
|
|
| def replace_arabic_numbers(text: str) -> str:
|
| for word, digit in arabic_numbers.items():
|
| text = re.sub(rf"\b{word}\b", digit, text)
|
| return text
|
|
|
|
|
|
|
| asr_model = None
|
|
|
| def initialize_nemo_model():
|
| """Initialize NeMo FastConformer model"""
|
| global asr_model
|
|
|
| logger.info("Loading NeMo FastConformer Arabic ASR model...")
|
|
|
|
|
| model_path = os.getenv(
|
| "NEMO_MODEL_PATH",
|
| "/path/to/stt_ar_fastconformer_hybrid_large_pcd_v1.0.nemo"
|
| )
|
|
|
| if not os.path.exists(model_path):
|
| logger.error(f"Model not found at: {model_path}")
|
| logger.info("Please download the model from: https://catalog.ngc.nvidia.com/orgs/nvidia/teams/nemo/models/stt_ar_fastconformer_hybrid_large_pcd")
|
| raise FileNotFoundError(f"NeMo model not found: {model_path}")
|
|
|
| try:
|
| asr_model = nemo_asr.models.EncDecCTCModel.restore_from(model_path)
|
| logger.info("NeMo FastConformer model loaded successfully")
|
|
|
|
|
| asr_model.eval()
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to load NeMo model: {e}")
|
| raise
|
|
|
|
|
| initialize_nemo_model()
|
|
|
|
|
| executor = ThreadPoolExecutor(max_workers=4)
|
|
|
| class JambonzAudioBuffer:
|
| def __init__(self, sample_rate=8000, chunk_duration=1.0):
|
| self.sample_rate = sample_rate
|
| self.chunk_duration = chunk_duration
|
| self.chunk_samples = int(chunk_duration * sample_rate)
|
|
|
| self.buffer = np.array([], dtype=np.float32)
|
| self.lock = threading.Lock()
|
| self.total_audio = np.array([], dtype=np.float32)
|
|
|
|
|
| self.silence_threshold = 0.05
|
| self.min_speech_samples = int(0.5 * sample_rate)
|
|
|
| def add_audio(self, audio_data):
|
| with self.lock:
|
| self.buffer = np.concatenate([self.buffer, audio_data])
|
| self.total_audio = np.concatenate([self.total_audio, audio_data])
|
|
|
| def has_chunk_ready(self):
|
| with self.lock:
|
| return len(self.buffer) >= self.chunk_samples
|
|
|
| def is_speech(self, audio_chunk):
|
| """Simple VAD based on energy"""
|
| if len(audio_chunk) < self.min_speech_samples:
|
| return False
|
| energy = np.mean(np.abs(audio_chunk))
|
| return energy > self.silence_threshold
|
|
|
| def get_chunk_for_processing(self):
|
| """Get audio chunk for processing"""
|
| with self.lock:
|
| if len(self.buffer) < self.chunk_samples:
|
| return None
|
| return np.array([1])
|
|
|
| def get_all_audio(self):
|
| """Get all accumulated audio"""
|
| with self.lock:
|
| return self.total_audio.copy()
|
|
|
| def clear(self):
|
| with self.lock:
|
| self.buffer = np.array([], dtype=np.float32)
|
| self.total_audio = np.array([], dtype=np.float32)
|
|
|
| def reset_for_new_segment(self):
|
| """Reset buffers for new transcription segment"""
|
| with self.lock:
|
| self.buffer = np.array([], dtype=np.float32)
|
| self.total_audio = np.array([], dtype=np.float32)
|
|
|
| def linear16_to_audio(audio_bytes, sample_rate=8000):
|
| """Convert LINEAR16 PCM bytes to numpy array"""
|
| try:
|
| audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
|
| audio_array = audio_array.astype(np.float32) / 32768.0
|
| return audio_array
|
| except Exception as e:
|
| logger.error(f"Error converting LINEAR16 to audio: {e}")
|
| return np.array([], dtype=np.float32)
|
|
|
| def resample_audio(audio_data, source_rate, target_rate):
|
| """Resample audio to target sample rate"""
|
| if source_rate == target_rate:
|
| return audio_data
|
|
|
| if source_rate == 8000 and target_rate == 16000:
|
|
|
| upsampled = np.repeat(audio_data, 2)
|
| return upsampled.astype(np.float32)
|
|
|
|
|
| ratio = target_rate / source_rate
|
| indices = np.arange(0, len(audio_data), 1/ratio)
|
| indices = indices[indices < len(audio_data)]
|
| resampled = np.interp(indices, np.arange(len(audio_data)), audio_data)
|
|
|
| return resampled.astype(np.float32)
|
|
|
| def transcribe_with_nemo(audio_data, source_sample_rate=8000, target_sample_rate=16000):
|
| """Transcribe audio using NeMo FastConformer"""
|
| try:
|
| if len(audio_data) == 0:
|
| return ""
|
|
|
|
|
| resampled_audio = resample_audio(audio_data, 8000, 16000)
|
|
|
|
|
| min_samples = int(0.3 * 16000)
|
| if len(resampled_audio) < min_samples:
|
| return ""
|
|
|
| start_time = time.time()
|
|
|
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
|
|
|
| sf.write(tmp_file.name, resampled_audio, target_sample_rate)
|
| tmp_path = tmp_file.name
|
|
|
| try:
|
|
|
| result = asr_model.transcribe([tmp_path])
|
|
|
|
|
| logger.info(f"NeMo result type: {type(result)}")
|
| if result and len(result) > 0:
|
| logger.info(f"First result type: {type(result[0])}")
|
| logger.info(f"First result content: {result[0]}")
|
|
|
| if result and len(result) > 0:
|
|
|
| if hasattr(result[0], 'text'):
|
|
|
| raw_text = result[0].text
|
| logger.info(f"Using .text attribute: {raw_text}")
|
| elif isinstance(result[0], str):
|
|
|
| raw_text = result[0]
|
| logger.info(f"Using direct string: {raw_text}")
|
| else:
|
|
|
| raw_text = str(result[0])
|
| logger.info(f"Using str() conversion: {raw_text}")
|
|
|
|
|
| if not isinstance(raw_text, str):
|
| raw_text = str(raw_text)
|
|
|
|
|
| if raw_text and raw_text.strip():
|
|
|
|
|
| logger.info(f"before sending to FXN--- {raw_text}")
|
| cleaned_text = replace_arabic_numbers(raw_text)
|
| logger.info(f"after FXN--- {cleaned_text}")
|
| end_time = time.time()
|
|
|
| if cleaned_text.strip():
|
| logger.info(f"NeMo transcription: '{cleaned_text}' (processed in {end_time - start_time:.2f}s)")
|
|
|
| return cleaned_text.strip()
|
| else:
|
| logger.info("No transcription text found")
|
| return ""
|
| else:
|
| logger.info("No results from NeMo transcription")
|
| return ""
|
|
|
| finally:
|
|
|
| if os.path.exists(tmp_path):
|
| os.remove(tmp_path)
|
|
|
| except Exception as e:
|
| logger.error(f"Error during NeMo transcription: {e}")
|
| return ""
|
|
|
| class JambonzSTTHandler:
|
| def __init__(self, websocket):
|
| self.websocket = websocket
|
| self.audio_buffer = None
|
| self.config = {}
|
| self.running = False
|
| self.transcription_task = None
|
|
|
|
|
| self.interim_count = 0
|
| self.last_interim_time = None
|
| self.silence_timeout = 2.0
|
| self.min_interim_count = 2
|
| self.auto_final_task = None
|
| self.accumulated_transcript = ""
|
| self.final_sent = False
|
| self.segment_number = 0
|
| self.last_partial = ""
|
|
|
|
|
| self.processing_count = 0
|
|
|
| async def start_processing(self, start_message):
|
| """Initialize with start message from jambonz"""
|
| self.config = {
|
| "language": start_message.get("language", "ar-EG"),
|
| "format": start_message.get("format", "raw"),
|
| "encoding": start_message.get("encoding", "LINEAR16"),
|
| "sample_rate": start_message.get("sampleRateHz", 8000),
|
| "interim_results": True,
|
| "options": start_message.get("options", {})
|
| }
|
|
|
| logger.info(f"NeMo STT session started with config: {self.config}")
|
|
|
|
|
| self.audio_buffer = JambonzAudioBuffer(
|
| sample_rate=self.config["sample_rate"],
|
| chunk_duration=1.0
|
| )
|
|
|
|
|
| self.running = True
|
| self.interim_count = 0
|
| self.last_interim_time = None
|
| self.accumulated_transcript = ""
|
| self.final_sent = False
|
| self.segment_number = 0
|
| self.processing_count = 0
|
| self.last_partial = ""
|
|
|
|
|
| self.transcription_task = asyncio.create_task(self._process_audio_chunks())
|
|
|
|
|
| self.auto_final_task = asyncio.create_task(self._monitor_for_auto_final())
|
|
|
| async def stop_processing(self):
|
| """Stop current processing session"""
|
| logger.info("Stopping NeMo STT session...")
|
| self.running = False
|
|
|
|
|
| for task in [self.transcription_task, self.auto_final_task]:
|
| if task:
|
| task.cancel()
|
| try:
|
| await task
|
| except asyncio.CancelledError:
|
| pass
|
|
|
|
|
| if not self.final_sent and self.accumulated_transcript.strip():
|
| await self.send_transcription(self.accumulated_transcript, is_final=True)
|
|
|
|
|
| if self.audio_buffer:
|
| all_audio = self.audio_buffer.get_all_audio()
|
| if len(all_audio) > 0 and not self.final_sent:
|
| loop = asyncio.get_event_loop()
|
| final_transcription = await loop.run_in_executor(
|
| executor,
|
| transcribe_with_nemo,
|
| all_audio,
|
| self.config["sample_rate"]
|
| )
|
|
|
| if final_transcription.strip():
|
| await self.send_transcription(final_transcription, is_final=True)
|
|
|
|
|
| if self.audio_buffer:
|
| self.audio_buffer.clear()
|
|
|
| logger.info("NeMo STT session stopped")
|
|
|
| async def start_new_segment(self):
|
| """Start a new transcription segment"""
|
| self.segment_number += 1
|
| self.interim_count = 0
|
| self.last_interim_time = None
|
| self.accumulated_transcript = ""
|
| self.final_sent = False
|
| self.last_partial = ""
|
| self.processing_count = 0
|
|
|
| if self.audio_buffer:
|
| self.audio_buffer.reset_for_new_segment()
|
|
|
| logger.info(f"Started new transcription segment #{self.segment_number}")
|
|
|
| async def add_audio_data(self, audio_bytes):
|
| """Add audio data to buffer"""
|
| if self.audio_buffer and self.running:
|
| audio_data = linear16_to_audio(audio_bytes, self.config["sample_rate"])
|
| self.audio_buffer.add_audio(audio_data)
|
|
|
| async def _process_audio_chunks(self):
|
| """Process audio chunks for interim results"""
|
| while self.running:
|
| try:
|
| if self.audio_buffer and self.audio_buffer.has_chunk_ready():
|
| chunk_signal = self.audio_buffer.get_chunk_for_processing()
|
| if chunk_signal is not None:
|
| all_audio = self.audio_buffer.get_all_audio()
|
|
|
| if len(all_audio) > 0 and self.audio_buffer.is_speech(all_audio[-self.audio_buffer.chunk_samples:]):
|
| loop = asyncio.get_event_loop()
|
| transcription = await loop.run_in_executor(
|
| executor,
|
| transcribe_with_nemo,
|
| all_audio,
|
| self.config["sample_rate"]
|
| )
|
|
|
| if transcription.strip():
|
| self.processing_count += 1
|
| self.accumulated_transcript = transcription
|
|
|
| if transcription != self.last_partial or self.interim_count == 0:
|
| self.last_partial = transcription
|
| self.interim_count += 1
|
| self.last_interim_time = time.time()
|
| logger.info(f"Updated interim_count to {self.interim_count} for transcript: '{transcription}'")
|
| else:
|
| self.last_interim_time = time.time()
|
|
|
| await asyncio.sleep(0.1)
|
|
|
| except Exception as e:
|
| logger.error(f"Error in chunk processing: {e}")
|
| await asyncio.sleep(0.1)
|
|
|
| async def _monitor_for_auto_final(self):
|
| """Monitor for auto-final conditions"""
|
| while self.running:
|
| try:
|
| current_time = time.time()
|
|
|
| if (self.interim_count >= self.min_interim_count and
|
| self.last_interim_time is not None and
|
| (current_time - self.last_interim_time) >= self.silence_timeout and
|
| not self.final_sent and
|
| self.accumulated_transcript.strip()):
|
|
|
| logger.info(f"Auto-final triggered for segment #{self.segment_number}")
|
|
|
| await self.send_transcription(self.accumulated_transcript, is_final=True)
|
| await self.start_new_segment()
|
|
|
| await asyncio.sleep(0.5)
|
|
|
| except Exception as e:
|
| logger.error(f"Error in auto-final monitoring: {e}")
|
| await asyncio.sleep(0.5)
|
|
|
| async def send_transcription(self, text, is_final=True, confidence=0.9):
|
| """Send transcription in jambonz format"""
|
| try:
|
| message = {
|
| "type": "transcription",
|
| "is_final": True,
|
| "alternatives": [
|
| {
|
| "transcript": text,
|
| "confidence": confidence
|
| }
|
| ],
|
| "language": self.config.get("language", "ar-EG"),
|
| "channel": 1
|
| }
|
|
|
| await self.websocket.send(json.dumps(message))
|
| self.final_sent = True
|
|
|
| logger.info(f"Sent FINAL transcription to Jambonz: '{text}'")
|
|
|
| except Exception as e:
|
| logger.error(f"Error sending transcription: {e}")
|
|
|
| async def send_error(self, error_message):
|
| """Send error message in jambonz format"""
|
| try:
|
| message = {
|
| "type": "error",
|
| "error": error_message
|
| }
|
| await self.websocket.send(json.dumps(message))
|
| logger.error(f"Sent error: {error_message}")
|
| except Exception as e:
|
| logger.error(f"Error sending error message: {e}")
|
|
|
| async def handle_jambonz_websocket(websocket):
|
| """Handle jambonz WebSocket connections"""
|
|
|
| client_id = f"jambonz_{id(websocket)}"
|
| logger.info(f"New NeMo jambonz connection: {client_id}")
|
|
|
| handler = JambonzSTTHandler(websocket)
|
|
|
| try:
|
| async for message in websocket:
|
| try:
|
| if isinstance(message, str):
|
| data = json.loads(message)
|
| message_type = data.get("type")
|
|
|
| if message_type == "start":
|
| logger.info(f"Received start message: {data}")
|
| await handler.start_processing(data)
|
|
|
| elif message_type == "stop":
|
| logger.info("Received stop message - closing WebSocket")
|
| await handler.stop_processing()
|
| await websocket.close(code=1000, reason="Session stopped by client")
|
| break
|
|
|
| else:
|
| logger.warning(f"Unknown message type: {message_type}")
|
| await handler.send_error(f"Unknown message type: {message_type}")
|
|
|
| else:
|
|
|
| if not handler.running or handler.audio_buffer is None:
|
| logger.warning("Received audio data outside of active session")
|
| await handler.send_error("Received audio before start message or after stop")
|
| continue
|
|
|
| await handler.add_audio_data(message)
|
|
|
| except json.JSONDecodeError as e:
|
| logger.error(f"JSON decode error: {e}")
|
| await handler.send_error(f"Invalid JSON: {str(e)}")
|
| except Exception as e:
|
| logger.error(f"Error processing message: {e}")
|
| await handler.send_error(f"Processing error: {str(e)}")
|
|
|
| except websockets.exceptions.ConnectionClosed:
|
| logger.info(f"NeMo jambonz connection closed: {client_id}")
|
| except Exception as e:
|
| logger.error(f"NeMo jambonz WebSocket error: {e}")
|
| try:
|
| await handler.send_error(str(e))
|
| except:
|
| pass
|
| finally:
|
| if handler.running:
|
| await handler.stop_processing()
|
| logger.info(f"NeMo jambonz connection ended: {client_id}")
|
|
|
| async def main():
|
| """Start the NeMo jambonz STT WebSocket server"""
|
| logger.info("Starting NeMo Jambonz STT WebSocket server on port 3007...")
|
|
|
|
|
| server = await websockets.serve(
|
| handle_jambonz_websocket,
|
| "0.0.0.0",
|
| 3007,
|
| ping_interval=20,
|
| ping_timeout=10,
|
| close_timeout=10
|
| )
|
|
|
| logger.info("NeMo Jambonz STT WebSocket server started on ws://0.0.0.0:3007")
|
| logger.info("Ready to handle jambonz STT requests with NeMo FastConformer")
|
| logger.info("FEATURES:")
|
| logger.info("- Arabic ASR using NeMo FastConformer model")
|
| logger.info("- Arabic number word to digit conversion")
|
| logger.info("- Continuous transcription with segmentation")
|
| logger.info("- Voice Activity Detection")
|
|
|
|
|
| await server.wait_closed()
|
|
|
| if __name__ == "__main__":
|
| print("=" * 80)
|
| print("NeMo FastConformer Jambonz STT Server")
|
| print("=" * 80)
|
| print("Model: NeMo FastConformer Arabic ASR")
|
| print("WebSocket Port: 3007")
|
| print("Protocol: jambonz STT API")
|
| print("Audio Format: LINEAR16 PCM @ 8kHz → 16kHz")
|
| print("Language: Arabic with number conversion")
|
| print("=" * 80)
|
|
|
| try:
|
| asyncio.run(main())
|
| except KeyboardInterrupt:
|
| print("\nShutting down NeMo server...")
|
| except Exception as e:
|
| print(f"Server error: {e}")
|
|
|