| from transformers import PreTrainedModel |
| from .MoEConfig import MoEConfig |
| from transformers import AutoModelForCausalLM |
| import torch |
| import numpy as np |
|
|
|
|
| class MoeModel(PreTrainedModel): |
| config_class = MoEConfig |
| verbose = True |
| fix_mode = False |
|
|
| def __init__(self, config): |
| super().__init__(config) |
| self.model_list = [] |
| for model_name in self.config_class.model_list: |
| self.append_model(model_name) |
|
|
| self.set_model_id(0) |
|
|
| """ |
| def set_model(self, model_name): |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| device_map="auto", |
| torch_dtype=torch.float16 |
| ) |
| """ |
|
|
| def append_model(self, model_name): |
| print("loading ", model_name) |
| model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| device_map="auto", |
| torch_dtype=torch.float16 |
| ) |
| self.model_list.append(model) |
|
|
| |
| |
|
|
| def set_model_id(self, model_id): |
| self.model = self.model_list[model_id] |
|
|
| def calc_perplexity(self, tokenized_input): |
| ppl_list = [] |
| for model in self.model_list: |
| ppl_list.append(perplexity(model, tokenized_input)) |
| return np.array(ppl_list) |
|
|
| def fix_model(self, model_id): |
| self.set_model_id(model_id) |
| self.fix_mode = True |
|
|
| def set_flexible_mode(self): |
| self.fix_mode = False |
|
|
| def generate(self, input_ids, attention_mask, |
| **generate_kwargs): |
|
|
| if not self.fix_mode: |
| ppl_array = self.calc_perplexity(input_ids) |
| best_model_id = np.where(ppl_array == min(ppl_array))[0][0] |
| self.set_model_id(best_model_id) |
|
|
| if self.verbose: |
| print(f"model {best_model_id} will be used") |
| print("ppl array: ", ppl_array) |
|
|
| ret = self.model.generate(input_ids=input_ids, |
| attention_mask=attention_mask, |
| **generate_kwargs) |
| return ret |
|
|
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
| def perplexity(model, tokenized_input) -> torch.Tensor: |
| with torch.inference_mode(): |
| output = model(tokenized_input.to(device), labels=tokenized_input) |
| ppl = torch.exp(output.loss) |
| return ppl.item() |
|
|