| import io |
| import numpy as np |
| import soundfile as sf |
| import time |
| import traceback |
| import threading |
| import queue |
| import torch |
| from groq import Groq |
| from typing import Optional, Dict, Any, Callable |
| from config.settings import settings |
|
|
| class SileroVAD: |
| def __init__(self): |
| self.model = None |
| self.sample_rate = 16000 |
| self.is_streaming = False |
| self.speech_callback = None |
| self.audio_buffer = [] |
| self.speech_buffer = [] |
| self.state = "silence" |
| self.speech_start_time = 0 |
| self.last_voice_time = 0 |
| |
| |
| self.chunk_size = 512 |
| self.speech_threshold = settings.VAD_THRESHOLD |
| self.min_speech_duration = settings.VAD_MIN_SPEECH_DURATION |
| self.min_silence_duration = settings.VAD_MIN_SILENCE_DURATION |
| self.speech_pad_duration = settings.VAD_SPEECH_PAD_DURATION |
| self.pre_speech_buffer = settings.VAD_PRE_SPEECH_BUFFER |
| |
| |
| self.pre_speech_samples = int(self.pre_speech_buffer * self.sample_rate) |
| self.pre_speech_buffer_data = [] |
| |
| |
| self.active_speech_buffer = [] |
| self.backup_speech_buffer = [] |
| |
| self._initialize_model() |
|
|
| def _initialize_model(self): |
| """Khởi tạo Silero VAD model""" |
| try: |
| print("🔄 Đang tải Silero VAD model...") |
| self.model, utils = torch.hub.load( |
| repo_or_dir='snakers4/silero-vad', |
| model='silero_vad', |
| force_reload=False, |
| trust_repo=True |
| ) |
| self.model.eval() |
| print("✅ Đã tải Silero VAD model thành công") |
| except Exception as e: |
| print(f"❌ Lỗi tải Silero VAD model: {e}") |
| self.model = None |
|
|
| def start_stream(self, speech_callback: Callable): |
| """Bắt đầu stream với VAD""" |
| if self.model is None: |
| return False |
|
|
| self.is_streaming = True |
| self.speech_callback = speech_callback |
| self.audio_buffer = [] |
| self.speech_buffer = [] |
| self.pre_speech_buffer_data = [] |
| self.active_speech_buffer = [] |
| self.backup_speech_buffer = [] |
| self.state = "silence" |
| self.speech_start_time = 0 |
| self.last_voice_time = 0 |
| print("🎙️ Bắt đầu VAD streaming với double buffer system...") |
| return True |
|
|
| def stop_stream(self): |
| """Dừng stream""" |
| self.is_streaming = False |
| self.speech_callback = None |
| self.audio_buffer = [] |
| self.speech_buffer = [] |
| self.pre_speech_buffer_data = [] |
| self.active_speech_buffer = [] |
| self.backup_speech_buffer = [] |
| self.state = "silence" |
| print("🛑 Đã dừng VAD streaming") |
|
|
| def process_stream(self, audio_chunk: np.ndarray, sample_rate: int): |
| """Xử lý audio chunk với VAD và double buffer""" |
| if not self.is_streaming or self.model is None: |
| return |
|
|
| try: |
| |
| if sample_rate != self.sample_rate: |
| audio_chunk = self._resample_audio(audio_chunk, sample_rate, self.sample_rate) |
|
|
| |
| self.audio_buffer.extend(audio_chunk) |
| |
| |
| if self.state == "speech": |
| self.backup_speech_buffer.extend(audio_chunk) |
|
|
| |
| while len(self.audio_buffer) >= self.chunk_size: |
| chunk = self.audio_buffer[:self.chunk_size] |
| self._process_vad_chunk(np.array(chunk)) |
| self.audio_buffer = self.audio_buffer[self.chunk_size:] |
|
|
| except Exception as e: |
| print(f"❌ Lỗi xử lý VAD: {e}") |
|
|
| def _process_vad_chunk(self, audio_chunk: np.ndarray): |
| """Xử lý VAD cho một chunk với double buffer""" |
| current_time = time.time() |
| |
| |
| audio_chunk = self._normalize_audio(audio_chunk) |
| |
| |
| speech_prob = self._get_speech_probability(audio_chunk) |
| |
| if self.state == "silence": |
| if speech_prob > self.speech_threshold: |
| print("🎤 Bắt đầu phát hiện speech") |
| self.state = "speech" |
| self.speech_start_time = current_time |
| self.last_voice_time = current_time |
| |
| |
| self.active_speech_buffer = self.pre_speech_buffer_data.copy() |
| self.active_speech_buffer.extend(audio_chunk) |
| self.backup_speech_buffer = self.active_speech_buffer.copy() |
| |
| else: |
| |
| self.pre_speech_buffer_data.extend(audio_chunk) |
| if len(self.pre_speech_buffer_data) > self.pre_speech_samples: |
| self.pre_speech_buffer_data = self.pre_speech_buffer_data[-self.pre_speech_samples:] |
| |
| elif self.state == "speech": |
| |
| self.active_speech_buffer.extend(audio_chunk) |
| self.backup_speech_buffer.extend(audio_chunk) |
| |
| |
| if speech_prob > self.speech_threshold: |
| self.last_voice_time = current_time |
| |
| |
| silence_duration = current_time - self.last_voice_time |
| speech_duration = current_time - self.speech_start_time |
| |
| |
| is_short_response = speech_duration < self.min_speech_duration |
| is_long_silence_after_short = silence_duration >= self.min_silence_duration |
| |
| if is_short_response and is_long_silence_after_short: |
| print(f"🎯 Phát hiện phản hồi ngắn: {speech_duration:.2f}s, im lặng: {silence_duration:.2f}s") |
| self._finalize_speech() |
| |
| elif (speech_duration >= self.min_speech_duration and |
| silence_duration >= self.min_silence_duration): |
| print(f"🎯 Kết thúc speech dài: {speech_duration:.2f}s") |
| self._finalize_speech() |
| |
| elif speech_duration > settings.MAX_AUDIO_DURATION: |
| print(f"⏰ Speech timeout ({speech_duration:.2f}s) - xử lý dù đang nói") |
| self._finalize_speech() |
| |
| elif self.state == "processing": |
| |
| self.backup_speech_buffer.extend(audio_chunk) |
|
|
| def _finalize_speech(self): |
| """Hoàn thành xử lý speech segment với buffer switching""" |
| if not self.active_speech_buffer: |
| self._reset_buffers() |
| return |
| |
| |
| self.state = "processing" |
| |
| |
| speech_audio = np.array(self.active_speech_buffer, dtype=np.float32) |
| |
| |
| if self.speech_callback: |
| threading.Thread( |
| target=self.speech_callback, |
| args=(speech_audio, self.sample_rate), |
| daemon=True |
| ).start() |
| |
| |
| self.active_speech_buffer = self.backup_speech_buffer.copy() |
| self.backup_speech_buffer = [] |
| |
| |
| self.state = "speech" |
| self.last_voice_time = time.time() |
|
|
| def _reset_buffers(self): |
| """Reset tất cả buffers""" |
| self.active_speech_buffer = [] |
| self.backup_speech_buffer = [] |
| self.audio_buffer = [] |
| self.state = "silence" |
|
|
| def _normalize_audio(self, audio: np.ndarray) -> np.ndarray: |
| """Chuẩn hóa audio""" |
| if audio.dtype != np.float32: |
| audio = audio.astype(np.float32) |
| if np.max(np.abs(audio)) > 1.0: |
| audio = audio / 32768.0 |
| return np.clip(audio, -1.0, 1.0) |
|
|
| def _get_speech_probability(self, audio_chunk: np.ndarray) -> float: |
| """Lấy xác suất speech""" |
| try: |
| if len(audio_chunk) != self.chunk_size: |
| return 0.0 |
| |
| audio_tensor = torch.from_numpy(audio_chunk).float().unsqueeze(0) |
| with torch.no_grad(): |
| return self.model(audio_tensor, self.sample_rate).item() |
| except Exception as e: |
| print(f" Lỗi speech probability: {e}") |
| return 0.0 |
|
|
| def _resample_audio(self, audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: |
| """Resample audio""" |
| if orig_sr == target_sr: |
| return audio |
| try: |
| from scipy import signal |
| duration = len(audio) / orig_sr |
| new_length = int(duration * target_sr) |
| resampled_audio = signal.resample(audio, new_length) |
| return resampled_audio.astype(np.float32) |
| except Exception: |
| return audio |
|
|
| def is_speech(self, audio_chunk: np.ndarray, sample_rate: int) -> bool: |
| """Kiểm tra speech (cho compatibility)""" |
| if self.model is None: |
| return True |
| |
| try: |
| if sample_rate != self.sample_rate: |
| audio_chunk = self._resample_audio(audio_chunk, sample_rate, self.sample_rate) |
| audio_chunk = self._normalize_audio(audio_chunk) |
| |
| chunk_size = 512 |
| speech_probs = [] |
| |
| for i in range(0, len(audio_chunk), chunk_size): |
| chunk = audio_chunk[i:i+chunk_size] |
| if len(chunk) == chunk_size: |
| prob = self._get_speech_probability(chunk) |
| speech_probs.append(prob) |
| |
| return np.mean(speech_probs) > self.speech_threshold if speech_probs else False |
| |
| except Exception as e: |
| print(f" Lỗi kiểm tra speech: {e}") |
| return True |