| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import math |
| from typing import Optional, Tuple |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.nn.utils.rnn import pad_sequence |
| from torchaudio.compliance.kaldi import fbank as torch_fbank |
| from transformers import PreTrainedModel, RobertaConfig, RobertaModel, RobertaTokenizer |
|
|
| from .configuration_clsp import CLSPConfig |
| from .zipformer2 import Conv2dSubsampling, Zipformer2 |
|
|
|
|
| class CLSPModel(PreTrainedModel): |
| config_class = CLSPConfig |
|
|
| def __init__(self, config: CLSPConfig): |
| super().__init__(config) |
| self.model = get_model(config) |
|
|
| def forward(self, *args, **kwargs): |
| return self.model(*args, **kwargs) |
|
|
| def load_audio(self, audio_path): |
| return self.model.load_audio(audio_path) |
|
|
|
|
| class MLPLayers(nn.Module): |
| def __init__(self, units=[512, 512, 512], nonlin=nn.ReLU(), dropout=0.1): |
| super(MLPLayers, self).__init__() |
| self.nonlin = nonlin |
| self.dropout = dropout |
|
|
| sequence = [] |
| for u0, u1 in zip(units[:-1], units[1:]): |
| sequence.append(nn.Linear(u0, u1)) |
| sequence.append(self.nonlin) |
| sequence.append(nn.Dropout(self.dropout)) |
| sequence = sequence[:-2] |
|
|
| self.sequential = nn.Sequential(*sequence) |
|
|
| def forward(self, X): |
| X = self.sequential(X) |
| return X |
|
|
|
|
| class CLAP(nn.Module): |
| def __init__( |
| self, |
| encoder_embed: nn.Module, |
| encoder: nn.Module, |
| encoder_downsample: Optional[nn.Module] = None, |
| encoder_dim: int = 384, |
| text_encoder_dim: int = 768, |
| joint_dim: int = 512, |
| ): |
| """CLAP-style dual encoder model. |
| |
| Args: |
| encoder_embed: |
| It is a Convolutional 2D subsampling module. It converts |
| an input of shape (N, T, idim) to an output of of shape |
| (N, T', odim), where T' = (T-3)//2-2 = (T-7)//2. |
| encoder: |
| It is the transcription network in the paper. Its accepts |
| two inputs: `x` of (N, T, encoder_dim) and `x_lens` of shape (N,). |
| It returns two tensors: `logits` of shape (N, T, encoder_dim) and |
| `logit_lens` of shape (N,). |
| """ |
| super().__init__() |
|
|
| |
| self.encoder_embed = encoder_embed |
| self.encoder = encoder |
| self.encoder_downsample = encoder_downsample |
| self.audio_projection = nn.Sequential( |
| nn.Linear(encoder_dim, joint_dim), |
| nn.ReLU(), |
| nn.Linear(joint_dim, joint_dim), |
| ) |
| self.audio_transform = MLPLayers( |
| units=[joint_dim, joint_dim, joint_dim], dropout=0.1 |
| ) |
|
|
| |
| self.text_tokenizer = RobertaTokenizer.from_pretrained("roberta-base") |
| self.text_encoder = text_encoder = RobertaModel( |
| RobertaConfig.from_pretrained("roberta-base") |
| ) |
| self.text_projection = nn.Sequential( |
| nn.Linear(text_encoder_dim, joint_dim), |
| nn.ReLU(), |
| nn.Linear(joint_dim, joint_dim), |
| ) |
| self.text_transform = MLPLayers( |
| units=[joint_dim, joint_dim, joint_dim], dropout=0.1 |
| ) |
|
|
| self.logit_scale = nn.Parameter(torch.full((), math.log(1 / 0.07))) |
|
|
| def _load_audio_single(self, audio_path: str) -> Tuple[torch.Tensor, int]: |
| waveform, sr = torchaudio.load(audio_path) |
| if waveform.size(0) > 1: |
| waveform = waveform.mean(dim=0, keepdim=True) |
| if sr != 16000: |
| transform = torchaudio.transforms.Resample(sr, 16000) |
| waveform = transform(waveform) |
| waveform_len = waveform.shape[-1] |
| return waveform, waveform_len |
|
|
| def load_audio(self, audio_paths: list[str]) -> Tuple[torch.Tensor, torch.Tensor]: |
| assert isinstance(audio_paths, list), "Must receive a list of files for reading" |
| waveforms = [] |
| waveform_lens = [] |
| for audio in audio_paths: |
| wav, wav_len = self._load_audio_single(audio) |
| waveforms.append(wav.squeeze()) |
| waveform_lens.append(wav_len) |
|
|
| waveforms = pad_sequence(waveforms, batch_first=True) |
| waveform_lens = torch.tensor(waveform_lens) |
| return waveforms, waveform_lens |
|
|
| def compute_fbank( |
| self, wavs: torch.Tensor, wav_lens: torch.Tensor |
| ) -> Tuple[torch.Tensor, torch.Tensor]: |
| """Compute fbank features |
| Args: |
| wavs (torch.Tensor): the mono-channel input waveform, (N, T) |
| wav_lens (torch.Tensor): the length of each waveform in samples (N) |
| Returns: |
| The fbank features, and their lengths |
| """ |
| assert wavs.ndim == 2, wavs.shape |
| low_freq = 20.0 |
| high_freq = -400.0 |
| dither = 0.0 |
| snip_egdes = False |
|
|
| features = [] |
| for i, wav in enumerate(wavs): |
| feat = torch_fbank( |
| wav[: wav_lens[i]].unsqueeze(0), |
| sample_frequency=16000, |
| num_mel_bins=128, |
| low_freq=low_freq, |
| snip_edges=snip_egdes, |
| high_freq=high_freq, |
| dither=dither, |
| energy_floor=1.0e-10, |
| ) |
| features.append(feat) |
| feat_len = torch.tensor([f.shape[0] for f in features]).to(wavs.device) |
| features = pad_sequence( |
| features, batch_first=True, padding_value=math.log(1e-10) |
| ).to(wavs.device) |
| return features, feat_len |
|
|
| def forward_audio_encoder( |
| self, x: torch.Tensor, x_lens: torch.Tensor, freeze_encoder: bool = False |
| ) -> Tuple[torch.Tensor, torch.Tensor]: |
| """Compute audio encoder outputs. |
| Args: |
| x: |
| A 3-D tensor of shape (N, T, C). |
| x_lens: |
| A 1-D tensor of shape (N,). It contains the number of frames in `x` |
| before padding. |
| |
| Returns: |
| encoder_out: |
| Encoder output, of shape (N, T, C). |
| encoder_out_lens: |
| Encoder output lengths, of shape (N,). |
| """ |
| |
| with torch.set_grad_enabled(not freeze_encoder): |
| x, x_lens = self.encoder_embed(x, x_lens) |
| src_key_padding_mask = make_pad_mask(x_lens) |
| x = x.permute(1, 0, 2) |
| encoder_out, encoder_out_lens = self.encoder( |
| x, x_lens, src_key_padding_mask |
| ) |
| encoder_out = encoder_out.permute(1, 0, 2) |
|
|
| assert torch.all(encoder_out_lens > 0), (x_lens, encoder_out_lens) |
|
|
| if self.encoder_downsample is not None: |
| encoder_out = encoder_out.permute(1, 0, 2) |
| encoder_out = self.encoder_downsample(encoder_out) |
| encoder_out = encoder_out.permute(1, 0, 2) |
| encoder_out_lens = (encoder_out_lens + 1) // 2 |
|
|
| padding_mask = make_pad_mask(encoder_out_lens) |
| encoder_out = encoder_out.masked_fill(padding_mask.unsqueeze(-1), 0.0) |
| embedding = encoder_out.sum(dim=1) / encoder_out_lens.unsqueeze(-1) |
|
|
| return embedding |
|
|
| def forward_text_encoder(self, y: dict, freeze_encoder: bool = False): |
| with torch.set_grad_enabled(not freeze_encoder): |
| encoder_out = self.text_encoder( |
| input_ids=y["input_ids"], |
| attention_mask=y["attention_mask"], |
| )["pooler_output"] |
|
|
| return encoder_out |
|
|
| def forward( |
| self, |
| audio: Optional[torch.Tensor] = None, |
| audio_lens: Optional[torch.Tensor] = None, |
| text: Optional[dict] = None, |
| freeze_audio_encoder: bool = False, |
| freeze_text_encoder: bool = False, |
| ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: |
| """ |
| Args: |
| audio (torch.Tensor): Input audio waveforms (N, L). |
| audio_lens (torch.Tensor): The length of the audio waveforms (N). |
| text: Input text list (N). |
| Returns: |
| The encoded representations and logit scale. |
| """ |
| if audio is not None: |
| assert audio.ndim == 2, audio.shape |
| assert audio_lens.ndim == 1, audio_lens.shape |
| x, x_lens = self.compute_fbank(audio, audio_lens) |
| audio_encoder_out = self.forward_audio_encoder( |
| x, x_lens, freeze_encoder=freeze_audio_encoder |
| ) |
| audio_encoder_out = self.audio_projection(audio_encoder_out) |
| audio_encoder_out = self.audio_transform(audio_encoder_out) |
| audio_encoder_out = F.normalize(audio_encoder_out, dim=-1) |
|
|
| if text is not None: |
| text = self.text_tokenizer( |
| text, |
| padding=True, |
| truncation=True, |
| return_tensors="pt", |
| ) |
| text = { |
| k: v.to(device=next(self.parameters()).device) for k, v in text.items() |
| } |
| assert text["input_ids"].ndim == 2, text["input_ids"].shape |
| text_encoder_out = self.forward_text_encoder( |
| text, freeze_encoder=freeze_text_encoder |
| ) |
| text_encoder_out = self.text_projection(text_encoder_out) |
| text_encoder_out = self.text_transform(text_encoder_out) |
| text_encoder_out = F.normalize(text_encoder_out, dim=-1) |
|
|
| return ( |
| audio_encoder_out if audio is not None else None, |
| text_encoder_out if text is not None else None, |
| self.logit_scale.exp(), |
| ) |
|
|
|
|
| def _to_int_tuple(s: str): |
| return tuple(map(int, s.split(","))) |
|
|
|
|
| def make_pad_mask( |
| lengths: torch.Tensor, |
| max_len: int = 0, |
| pad_left: bool = False, |
| ) -> torch.Tensor: |
| """ |
| Args: |
| lengths: |
| A 1-D tensor containing sentence lengths. |
| max_len: |
| The length of masks. |
| pad_left: |
| If ``False`` (default), padding is on the right. |
| If ``True``, padding is on the left. |
| Returns: |
| Return a 2-D bool tensor, where masked positions |
| are filled with `True` and non-masked positions are |
| filled with `False`. |
| |
| >>> lengths = torch.tensor([1, 3, 2, 5]) |
| >>> make_pad_mask(lengths) |
| tensor([[False, True, True, True, True], |
| [False, False, False, True, True], |
| [False, False, True, True, True], |
| [False, False, False, False, False]]) |
| """ |
| assert lengths.ndim == 1, lengths.ndim |
| max_len = max(max_len, lengths.max()) |
| n = lengths.size(0) |
| seq_range = torch.arange(0, max_len, device=lengths.device) |
| expanded_lengths = seq_range.unsqueeze(0).expand(n, max_len) |
|
|
| if pad_left: |
| mask = expanded_lengths < (max_len - lengths).unsqueeze(1) |
| else: |
| mask = expanded_lengths >= lengths.unsqueeze(-1) |
|
|
| return mask |
|
|
|
|
| def get_encoder_embed(config: CLSPConfig) -> nn.Module: |
| encoder_embed = Conv2dSubsampling( |
| in_channels=config.feature_dim, |
| out_channels=_to_int_tuple(config.encoder_dim)[0], |
| ) |
| return encoder_embed |
|
|
|
|
| def get_encoder_model(config: CLSPConfig) -> nn.Module: |
| encoder = Zipformer2( |
| output_downsampling_factor=config.output_downsampling_factor, |
| downsampling_factor=_to_int_tuple(config.downsampling_factor), |
| num_encoder_layers=_to_int_tuple(config.num_encoder_layers), |
| encoder_dim=_to_int_tuple(config.encoder_dim), |
| encoder_unmasked_dim=_to_int_tuple(config.encoder_unmasked_dim), |
| query_head_dim=_to_int_tuple(config.query_head_dim), |
| pos_head_dim=_to_int_tuple(config.pos_head_dim), |
| value_head_dim=_to_int_tuple(config.value_head_dim), |
| pos_dim=config.pos_dim, |
| num_heads=_to_int_tuple(config.num_heads), |
| feedforward_dim=_to_int_tuple(config.feedforward_dim), |
| cnn_module_kernel=_to_int_tuple(config.cnn_module_kernel), |
| causal=config.causal, |
| chunk_size=_to_int_tuple(config.chunk_size), |
| left_context_frames=_to_int_tuple(config.left_context_frames), |
| ) |
| return encoder |
|
|
|
|
| def get_model(config: CLSPConfig) -> nn.Module: |
| encoder_embed = get_encoder_embed(config) |
| encoder = get_encoder_model(config) |
| model = CLAP( |
| encoder_embed=encoder_embed, |
| encoder=encoder, |
| encoder_dim=max(_to_int_tuple(config.encoder_dim)), |
| text_encoder_dim=config.text_encoder_dim, |
| joint_dim=config.joint_dim, |
| ) |
| return model |
|
|