| import torch |
| from torch import nn |
| import torchaudio |
| from transformers import PreTrainedModel, AutoModelForCausalLM, AutoTokenizer, HubertModel, AutoProcessor |
| from .config import SpeechLLMModelConfig |
| from peft import LoraConfig, get_peft_model |
|
|
| class HubertXCNNEnoder(nn.Module): |
| def __init__(self, audio_enc_dim, llm_dim): |
| super().__init__() |
| self.encoder = HubertModel.from_pretrained('facebook/hubert-xlarge-ll60k') |
| for param in self.encoder.parameters(): |
| param.requires_grad = False |
|
|
| self.cnn = nn.Sequential( |
| nn.ReLU(), |
| nn.Conv1d(audio_enc_dim, llm_dim // 2, kernel_size=5, stride=1, padding=0), |
| nn.ReLU(), |
| nn.Conv1d(llm_dim // 2, llm_dim, kernel_size=5, stride=2, padding=0), |
| nn.ReLU(), |
| nn.Conv1d(llm_dim, llm_dim, kernel_size=3, stride=1, padding=0), |
| ) |
|
|
| def forward(self, x): |
| x = self.encoder(x).last_hidden_state |
| x = self.cnn(x.transpose(1, 2)).transpose(1, 2) |
| return x |
|
|
| class SpeechLLMModel(PreTrainedModel): |
| config_class = SpeechLLMModelConfig |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| self.audio_processor = AutoProcessor.from_pretrained("facebook/hubert-large-ls960-ft") |
| self.audio_encoder = HubertXCNNEnoder(config.audio_enc_dim, config.llm_dim) |
| self.llm_model = AutoModelForCausalLM.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") |
| self.llm_tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") |
|
|
| peft_config = LoraConfig( |
| r=4, |
| lora_alpha=8, |
| target_modules=['q_proj', 'k_proj', 'v_proj', 'o_proj', 'up_proj', 'down_proj', 'gate_proj'], |
| lora_dropout=0.05, |
| task_type="CAUSAL_LM", |
| ) |
| self.llm_model = get_peft_model(self.llm_model, peft_config) |
|
|
| def encode(self, mel, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids): |
| batch_size = mel.shape[0] |
|
|
| with torch.no_grad(): |
| speech_embeds = self.audio_encoder(mel) |
| embedder = self.llm_model.model.model.embed_tokens |
| pre_prompt_embeds = embedder(pre_tokenized_ids) |
| post_prompt_embeds = embedder(post_tokenized_ids) |
| output_prompt_embeds = embedder(output_tokenized_ids) |
|
|
| combined_embeds = torch.cat([pre_prompt_embeds, speech_embeds, post_prompt_embeds, output_prompt_embeds], dim=1) |
| atts = torch.ones(combined_embeds.size()[:-1], dtype=torch.long).to(combined_embeds.device) |
|
|
| input_token_length = pre_tokenized_ids.shape[1] + speech_embeds.shape[1] + post_tokenized_ids.shape[1] |
| label_ids = torch.cat([ |
| torch.ones([batch_size, input_token_length], device=combined_embeds.device) * -100, |
| output_tokenized_ids |
| ], 1).to(combined_embeds.device).to(torch.int64) |
| return combined_embeds, atts, label_ids |
|
|
| def forward(self, wav_tensor, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids, attention_mask=None): |
| combined_embeds, atts, label_ids = self.encode(wav_tensor, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids) |
| outputs = self.llm_model(inputs_embeds=combined_embeds, attention_mask=attention_mask) |
| return outputs |
| |
| def generate_meta(self, audio_path, instruction="Give me the following information about the audio [Transcript]", max_new_tokens=2000): |
| pre_speech_prompt = f'''Instruction: |
| {instruction} |
| |
| Input: |
| <speech>''' |
| post_speech_prompt = f'''</speech> |
| |
| Output:''' |
| output_prompt = '\n<s>' |
|
|
| with torch.no_grad(): |
| wav_tensor, sr = torchaudio.load(audio_path) |
| wav_tensor = self.audio_processor(wav_tensor.squeeze(), return_tensors="pt", sampling_rate=16000).input_values |
|
|
| pre_tokenized_ids = self.llm_tokenizer(pre_speech_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] |
| post_tokenized_ids = self.llm_tokenizer(post_speech_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] |
| output_tokenized_ids = self.llm_tokenizer(output_prompt, padding="do_not_pad", return_tensors='pt', truncation=False, add_special_tokens=False)["input_ids"] |
|
|
| combined_embeds, atts, label_ids = self.encode(wav_tensor, pre_tokenized_ids, post_tokenized_ids, output_tokenized_ids) |
|
|
| out = self.llm_model.generate( |
| inputs_embeds=combined_embeds, |
| max_new_tokens=max_new_tokens, |
| ).cpu().tolist()[0] |
|
|
| output_text = self.llm_tokenizer.decode(out, skip_special_tokens=False) |
| return output_text |
|
|
|
|