| |
| """ |
| Unified CoreML conversion script for Parakeet EOU streaming encoder. |
| |
| Supports 160ms, 320ms, and 1600ms chunk sizes by properly configuring NeMo's |
| streaming parameters before tracing. |
| |
| Usage: |
| # 160ms (default) |
| python convert_streaming_encoder_unified.py --chunk-ms 160 --output-dir Models/160ms/160ms |
| |
| # 320ms |
| python convert_streaming_encoder_unified.py --chunk-ms 320 --output-dir Models/320ms |
| |
| # 1600ms |
| python convert_streaming_encoder_unified.py --chunk-ms 1600 --output-dir Models/1600ms |
| """ |
|
|
| import argparse |
| import json |
| from pathlib import Path |
| from typing import Tuple |
|
|
| import coremltools as ct |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| from nemo.collections.asr.models import EncDecRNNTBPEModel |
|
|
|
|
| class LoopbackEncoderWrapper(nn.Module): |
| """ |
| Wraps the Parakeet Encoder for CoreML Loopback Streaming. |
| |
| This wrapper handles the pre_cache concatenation and cache management |
| that NeMo does internally in its streaming pipeline. |
| |
| Inputs: |
| - audio_signal: [B, D, T] (Mel spectrogram chunk) |
| - audio_length: [B] |
| - pre_cache: [B, D, pre_cache_size] (Previous mel context) |
| - cache_last_channel: [layers, B, cache_size, hidden] |
| - cache_last_time: [layers, B, hidden, time_cache] |
| - cache_last_channel_len: [B] |
| |
| Outputs: |
| - encoded_output: [B, D_out, T_out] |
| - encoded_length: [B] |
| - new_pre_cache: [B, D, pre_cache_size] |
| - new_cache_last_channel |
| - new_cache_last_time |
| - new_cache_last_channel_len |
| """ |
|
|
| def __init__(self, encoder, pre_cache_size: int): |
| super().__init__() |
| self.encoder = encoder |
| self.pre_cache_size = pre_cache_size |
|
|
| def forward( |
| self, |
| audio_signal: torch.Tensor, |
| audio_length: torch.Tensor, |
| pre_cache: torch.Tensor, |
| cache_last_channel: torch.Tensor, |
| cache_last_time: torch.Tensor, |
| cache_last_channel_len: torch.Tensor, |
| ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: |
|
|
| |
| full_input = torch.cat([pre_cache, audio_signal], dim=2) |
| full_length = audio_length + self.pre_cache_size |
|
|
| |
| new_pre_cache = full_input[:, :, -self.pre_cache_size :] |
|
|
| |
| encoded, encoded_len, new_cache_channel, new_cache_time, new_cache_len = ( |
| self.encoder.cache_aware_stream_step( |
| processed_signal=full_input, |
| processed_signal_length=full_length, |
| cache_last_channel=cache_last_channel, |
| cache_last_time=cache_last_time, |
| cache_last_channel_len=cache_last_channel_len, |
| ) |
| ) |
|
|
| |
| encoded_len_32 = encoded_len.to(dtype=torch.int32) |
| new_channel_len_32 = new_cache_len.to(dtype=torch.int32) |
|
|
| return ( |
| encoded, |
| encoded_len_32, |
| new_pre_cache, |
| new_cache_channel, |
| new_cache_time, |
| new_channel_len_32, |
| ) |
|
|
|
|
| def get_streaming_config(encoder, chunk_ms: int): |
| """ |
| Get the correct streaming configuration for the given chunk size. |
| |
| Returns: |
| dict with: |
| - chunk_size: encoder output steps |
| - shift_size: shift steps |
| - mel_frames: input mel frames for the chunk |
| - pre_cache_size: pre-encode cache size in mel frames |
| - valid_out_len: number of valid output frames per chunk |
| """ |
| if chunk_ms == 160: |
| |
| |
| |
| return { |
| "chunk_size": 4, |
| "shift_size": 2, |
| "mel_frames": 17, |
| "pre_cache_size": 16, |
| "valid_out_len": 2, |
| "samples": 2560, |
| } |
| elif chunk_ms == 320: |
| |
| |
| |
| encoder.setup_streaming_params(chunk_size=8, shift_size=4) |
| cfg = encoder.streaming_cfg |
| print(f"320ms streaming_cfg: {cfg}") |
|
|
| |
| |
| |
| |
| return { |
| "chunk_size": 8, |
| "shift_size": 4, |
| "mel_frames": 64, |
| "pre_cache_size": 9, |
| "valid_out_len": 4, |
| "samples": 10240, |
| "shift_samples": 5120, |
| } |
| elif chunk_ms == 1600: |
| |
| |
| |
| |
| |
| |
| encoder.setup_streaming_params(chunk_size=40, shift_size=20) |
| cfg = encoder.streaming_cfg |
| print(f"1600ms streaming_cfg: {cfg}") |
|
|
| |
| |
| return { |
| "chunk_size": 40, |
| "shift_size": 20, |
| "mel_frames": 320, |
| "pre_cache_size": 9, |
| "valid_out_len": 20, |
| "samples": 50928, |
| "shift_samples": 25600, |
| } |
| else: |
| raise ValueError(f"Unsupported chunk size: {chunk_ms}ms. Use 160, 320, or 1600.") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Convert Parakeet EOU encoder to CoreML") |
| parser.add_argument( |
| "--chunk-ms", |
| type=int, |
| default=160, |
| choices=[160, 320, 1600], |
| help="Chunk size in milliseconds (160, 320, or 1600)", |
| ) |
| parser.add_argument( |
| "--output-dir", |
| type=str, |
| default=None, |
| help="Output directory for the CoreML model", |
| ) |
| parser.add_argument( |
| "--model-id", |
| type=str, |
| default="nvidia/parakeet_realtime_eou_120m-v1", |
| help="HuggingFace model ID", |
| ) |
| args = parser.parse_args() |
|
|
| |
| if args.output_dir is None: |
| args.output_dir = f"Models/{args.chunk_ms}ms" |
|
|
| output_path = Path(args.output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| print(f"Loading model: {args.model_id}...") |
| asr_model = EncDecRNNTBPEModel.from_pretrained(model_name=args.model_id) |
| asr_model.eval() |
|
|
| encoder = asr_model.encoder |
|
|
| |
| config = get_streaming_config(encoder, args.chunk_ms) |
|
|
| mel_dim = 128 |
| hidden_dim = encoder.d_model |
| num_layers = len(encoder.layers) |
|
|
| |
| cache_channel_size = 70 |
| cache_time_size = 8 |
| pre_cache_size = config["pre_cache_size"] |
| chunk_size_in = config["mel_frames"] |
|
|
| print(f"\n=== Configuration for {args.chunk_ms}ms ===") |
| print(f"Mel frames: {chunk_size_in}") |
| print(f"Pre-cache: {pre_cache_size}") |
| print(f"Valid output len: {config['valid_out_len']}") |
| print(f"Hidden dim: {hidden_dim}, Layers: {num_layers}") |
| print(f"Cache: Channel={cache_channel_size}, Time={cache_time_size}") |
|
|
| |
| wrapper = LoopbackEncoderWrapper(encoder, pre_cache_size=pre_cache_size) |
| wrapper.eval() |
|
|
| |
| batch_size = 1 |
| test_mel = torch.randn(batch_size, mel_dim, chunk_size_in) |
| test_mel_len = torch.tensor([chunk_size_in], dtype=torch.int32) |
| test_pre_cache = torch.zeros(batch_size, mel_dim, pre_cache_size) |
| test_cache_channel = torch.zeros(num_layers, batch_size, cache_channel_size, hidden_dim) |
| test_cache_time = torch.zeros(num_layers, batch_size, hidden_dim, cache_time_size) |
| test_cache_len = torch.zeros(batch_size, dtype=torch.int32) |
|
|
| print("\nTracing model...") |
| traced_model = torch.jit.trace( |
| wrapper, |
| (test_mel, test_mel_len, test_pre_cache, test_cache_channel, test_cache_time, test_cache_len), |
| strict=False, |
| ) |
|
|
| |
| with torch.no_grad(): |
| out = traced_model( |
| test_mel, test_mel_len, test_pre_cache, test_cache_channel, test_cache_time, test_cache_len |
| ) |
| print(f"Encoder output shape: {out[0].shape}") |
|
|
| |
| print("\nConverting to CoreML...") |
| inputs = [ |
| ct.TensorType(name="audio_signal", shape=(1, mel_dim, chunk_size_in), dtype=np.float32), |
| ct.TensorType(name="audio_length", shape=(1,), dtype=np.int32), |
| ct.TensorType(name="pre_cache", shape=(1, mel_dim, pre_cache_size), dtype=np.float32), |
| ct.TensorType( |
| name="cache_last_channel", |
| shape=(num_layers, 1, cache_channel_size, hidden_dim), |
| dtype=np.float32, |
| ), |
| ct.TensorType( |
| name="cache_last_time", |
| shape=(num_layers, 1, hidden_dim, cache_time_size), |
| dtype=np.float32, |
| ), |
| ct.TensorType(name="cache_last_channel_len", shape=(1,), dtype=np.int32), |
| ] |
|
|
| outputs = [ |
| ct.TensorType(name="encoded_output", dtype=np.float32), |
| ct.TensorType(name="encoded_length", dtype=np.int32), |
| ct.TensorType(name="new_pre_cache", dtype=np.float32), |
| ct.TensorType(name="new_cache_last_channel", dtype=np.float32), |
| ct.TensorType(name="new_cache_last_time", dtype=np.float32), |
| ct.TensorType(name="new_cache_last_channel_len", dtype=np.int32), |
| ] |
|
|
| mlmodel = ct.convert( |
| traced_model, |
| inputs=inputs, |
| outputs=outputs, |
| compute_units=ct.ComputeUnit.CPU_ONLY, |
| minimum_deployment_target=ct.target.macOS14, |
| ) |
|
|
| save_path = output_path / "streaming_encoder.mlpackage" |
| mlmodel.save(str(save_path)) |
| print(f"Saved: {save_path}") |
|
|
| |
| metadata = { |
| "model_id": args.model_id, |
| "chunk_ms": args.chunk_ms, |
| "mel_frames": chunk_size_in, |
| "pre_cache_size": pre_cache_size, |
| "valid_out_len": config["valid_out_len"], |
| "samples_per_chunk": config["samples"], |
| "hidden_dim": hidden_dim, |
| "num_layers": num_layers, |
| "cache_channel_size": cache_channel_size, |
| "cache_time_size": cache_time_size, |
| } |
|
|
| metadata_path = output_path / "streaming_encoder_metadata.json" |
| with open(metadata_path, "w") as f: |
| json.dump(metadata, f, indent=2) |
| print(f"Saved metadata: {metadata_path}") |
|
|
| print(f"\n=== Export complete for {args.chunk_ms}ms ===") |
| print(f"Output: {output_path}") |
| print("\nNote: Decoder and Joint models are shared between chunk sizes.") |
| print("Copy decoder.mlmodelc, joint_decision.mlmodelc, and vocab.json from 160ms directory.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|