|
|
| import queue |
| import threading |
| import time |
| from logging import getLogger |
| import asyncio |
| import numpy as np |
| import config |
| import collections |
| from api_model import TransResult, Message |
|
|
| from .utils import log_block, start_thread, get_text_separator, filter_words |
| from .processing import ProcessingPipes |
| from .pipelines import MetaItem |
|
|
|
|
| logger = getLogger("TranscriptionService") |
|
|
|
|
|
|
| class WhisperTranscriptionService: |
| """ |
| Whisper语音转录服务类,处理音频流转录和翻译 |
| """ |
|
|
|
|
| def __init__(self, websocket, pipe: ProcessingPipes, language=None, dst_lang=None, client_uid=None): |
| print('>>>>>>>>>>>>>>>> init service >>>>>>>>>>>>>>>>>>>>>>') |
| print('src_lang:', language) |
| self.source_language = language |
| self.target_language = dst_lang |
| self.client_uid = client_uid |
| |
| self.websocket = websocket |
| self.translate_pipe = pipe |
| |
| self.sample_rate = config.SAMPLE_RATE |
|
|
| self.frame_lock = threading.Lock() |
| self.segment_lock = threading.Lock() |
| |
| self.text_separator = get_text_separator(language) |
| self.loop = asyncio.get_event_loop() |
| |
| self.frame_queue = queue.Queue() |
| |
| self.frames_np = np.array([], dtype=np.float32) |
| |
| self.frames_np_start_timestamp = None |
| |
| self.full_segments_queue = collections.deque() |
| |
| self._stop = threading.Event() |
| self.translate_thread = start_thread(self._transcription_processing_loop) |
| self.frame_processing_thread = start_thread(self._read_frame_processing_loop) |
| |
| self.row_number = 0 |
|
|
|
|
| def add_frames(self, frame_np: np.ndarray) -> None: |
| """添加音频帧到处理队列""" |
| self.frame_queue.put(frame_np) |
|
|
| def _apply_voice_activity_detection(self, frame_np:np.array): |
| """应用语音活动检测来优化音频缓冲区""" |
| processed_audio = self.translate_pipe.voice_detect(frame_np.tobytes()) |
| speech_audio = np.frombuffer(processed_audio.audio, dtype=np.float32) |
| speech_status = processed_audio.speech_status |
| return speech_audio, speech_status |
|
|
|
|
| def _read_frame_processing_loop(self) -> None: |
| """从队列获取音频帧并合并到缓冲区""" |
| while not self._stop.is_set(): |
| frame_np = self.frame_queue.get() |
| frame_np, speech_status = self._apply_voice_activity_detection(frame_np) |
|
|
| if frame_np is None: |
| continue |
|
|
| with self.frame_lock: |
| self.frames_np = np.append(self.frames_np, frame_np) |
|
|
| |
| if speech_status == "START" and self.frames_np_start_timestamp is None: |
| self.frames_np_start_timestamp = time.time() |
|
|
| |
| if len(self.frames_np) >= self.sample_rate * config.MAX_SPEECH_DURATION_S: |
| audio_array=self.frames_np.copy() |
| with self.segment_lock: |
| self.full_segments_queue.appendleft(audio_array) |
| self.frames_np_start_timestamp = time.time() |
|
|
| with self.frame_lock: |
| self.frames_np = np.array([], dtype=np.float32) |
|
|
| |
| |
| |
| if speech_status == "END" and len(self.frames_np) > 0 and self.frames_np_start_timestamp: |
| time_diff = time.time() - self.frames_np_start_timestamp |
| if time_diff >= config.FRAME_SCOPE_TIME_THRESHOLD: |
| with self.frame_lock: |
| audio_array=self.frames_np.copy() |
| self.frames_np = np.array([], dtype=np.float32) |
|
|
| with self.segment_lock: |
| self.full_segments_queue.appendleft(audio_array) |
| logger.debug(f"🥳 增加整句到队列") |
| self.frames_np_start_timestamp = None |
| |
| else: |
| logger.debug(f"🥳 当前时间与上一句的时间差: {time_diff:.2f}s,继续保留在缓冲区") |
|
|
|
|
| def _transcription_processing_loop(self) -> None: |
| """主转录处理循环""" |
| frame_epoch = 1 |
|
|
| while not self._stop.is_set(): |
| time.sleep(0.1) |
| |
| with self.segment_lock: |
| segment_length = len(self.full_segments_queue) |
| if segment_length > 0: |
| audio_buffer = self.full_segments_queue.pop() |
| partial = False |
| else: |
| with self.frame_lock: |
| if len(self.frames_np) ==0: |
| continue |
| audio_buffer = self.frames_np[:int(frame_epoch * 1.5 * self.sample_rate)].copy() |
| partial = True |
|
|
| logger.debug(f"full_segments_queue size: {segment_length}") |
| logger.debug(f"audio buffer size: {len(self.frames_np) / self.sample_rate:.2f}s") |
| |
| if len(audio_buffer) < int(self.sample_rate): |
| |
| padding_samples = int(self.sample_rate * 0.01) |
| target_length = self.sample_rate + padding_samples |
| silence_audio = np.zeros(target_length, dtype=np.float32) |
| |
| copy_length = min(len(audio_buffer), target_length) |
| silence_audio[-copy_length:] = audio_buffer[-copy_length:] |
| audio_buffer = silence_audio |
|
|
| |
| meta_item = self._transcribe_audio(audio_buffer) |
| segments = meta_item.segments |
| logger.debug(f"Segments: {segments}") |
| segments = filter_words(segments) |
|
|
| if len(segments): |
| seg_text = self.text_separator.join(seg.text for seg in segments) |
| if seg_text.strip() in ['', '.', '-']: |
| continue |
| |
| if not partial: |
| translated_content = self._translate_text_large(seg_text) |
| self.row_number += 1 |
| frame_epoch = 1 |
| else: |
| translated_content = self._translate_text(seg_text) |
| frame_epoch += 1 |
|
|
| result = TransResult( |
| seg_id=self.row_number, |
| context=seg_text, |
| from_=self.source_language, |
| to=self.target_language, |
| tran_content=translated_content, |
| partial=partial |
| ) |
| self._send_result_to_client(result) |
|
|
| def _transcribe_audio(self, audio_buffer: np.ndarray)->MetaItem: |
| """转录音频并返回转录片段""" |
| log_block("Audio buffer length", f"{audio_buffer.shape[0]/self.sample_rate:.2f}", "s") |
| result = self.translate_pipe.transcribe(audio_buffer.tobytes(), self.source_language) |
| log_block("📝 transcribe output", f"{self.text_separator.join(seg.text for seg in result.segments)}", "") |
|
|
| return result |
|
|
| def _translate_text(self, text: str) -> str: |
| """将文本翻译为目标语言""" |
| if not text.strip(): |
| return "" |
| log_block("🐧 Translation input ", f"{text}") |
|
|
| result = self.translate_pipe.translate(text, self.source_language, self.target_language) |
| translated_text = result.translate_content |
|
|
| log_block("🐧 Translation out ", f"{translated_text}") |
| return translated_text |
|
|
| def _translate_text_large(self, text: str) -> str: |
| """将文本翻译为目标语言""" |
| if not text.strip(): |
| return "" |
| log_block("Translation input", f"{text}") |
| result = self.translate_pipe.translate_large(text, self.source_language, self.target_language) |
| translated_text = result.translate_content |
| log_block("Translation large model output", f"{translated_text}") |
| return translated_text |
|
|
|
|
| def _send_result_to_client(self, result: TransResult) -> None: |
| """发送翻译结果到客户端""" |
| try: |
| message = Message(result=result, request_id=self.client_uid).model_dump_json(by_alias=True) |
| coro = self.websocket.send_text(message) |
| future = asyncio.run_coroutine_threadsafe(coro, self.loop) |
| future.add_done_callback(lambda fut: fut.exception() and self.stop()) |
| except RuntimeError: |
| self.stop() |
| except Exception as e: |
| logger.error(f"Error sending result to client: {e}") |
|
|
| def stop(self) -> None: |
| """停止所有处理线程并清理资源""" |
| self._stop.set() |
| logger.info(f"Stopping transcription service for client: {self.client_uid}") |
|
|