| import math |
| from typing import List, Optional |
|
|
| import timm |
| import torch |
| import torchvision |
| from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD |
| from torchvision import transforms |
| from transformers import LlamaTokenizer |
|
|
| from .configuration_minicpm import MiniCPMVConfig |
| from .modeling_minicpm import MiniCPMPreTrainedModel, MiniCPMForCausalLM |
| from .resampler import Resampler |
|
|
|
|
| class MiniCPMVPreTrainedModel(MiniCPMPreTrainedModel): |
| config_class = MiniCPMVConfig |
|
|
|
|
| class MiniCPMV(MiniCPMVPreTrainedModel): |
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.llm = MiniCPMForCausalLM(config) |
| self.vpm = self.init_vision_module() |
| self.vision_dim = self.vpm.embed_dim |
| self.embed_dim = self.llm.config.hidden_size |
| self.resampler = self.init_resampler(self.embed_dim ,self.vision_dim) |
| self.transform = self.init_transform() |
|
|
|
|
| def init_vision_module(self): |
| model = timm.create_model( |
| self.config.vision_encoder, |
| pretrained=False, |
| num_classes=0, |
| dynamic_img_size=True, |
| dynamic_img_pad=True |
| ) |
|
|
| if isinstance(model, timm.models.VisionTransformer): |
| if model.attn_pool is not None: |
| model.attn_pool = torch.nn.Identity() |
|
|
| if self.config.drop_vision_last_layer: |
| model.blocks = model.blocks[:-1] |
|
|
| return model |
|
|
| def init_resampler(self, embed_dim, vision_dim): |
| return Resampler( |
| grid_size=int(math.sqrt(self.config.query_num)), |
| embed_dim=embed_dim, |
| num_heads=embed_dim // 128, |
| kv_dim=vision_dim, |
| ) |
|
|
| def init_transform(self): |
| return transforms.Compose([ |
| transforms.Resize( |
| (self.config.image_size, self.config.image_size), |
| interpolation=torchvision.transforms.InterpolationMode.BICUBIC |
| ), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=IMAGENET_INCEPTION_MEAN, std=IMAGENET_INCEPTION_STD) |
| ]) |
|
|
|
|
|
|
| def get_vision_embedding(self, pixel_values): |
| res = [] |
| dtype = self.vpm.pos_embed.data.dtype |
| for pixel_value in pixel_values: |
| vision_embedding = self.vpm.forward_features(pixel_value.unsqueeze(0).type(dtype)) |
| if hasattr(self.vpm, 'num_prefix_tokens') and self.vpm.num_prefix_tokens > 0: |
| vision_embedding = vision_embedding[:, self.vpm.num_prefix_tokens:] |
| res.append(self.resampler(vision_embedding)) |
| return torch.vstack(res) |
|
|
| def get_vllm_embedding(self, data): |
| if 'vision_hidden_states' not in data: |
| pixel_values_list = data['pixel_values'] |
| vision_hidden_states = [] |
| for pixel_values in pixel_values_list: |
| if len(pixel_values) > 0: |
| vision_hidden_states.append(self.get_vision_embedding(pixel_values)) |
| elif self.training: |
| dtype = self.vpm.pos_embed.data.dtype |
| device = self.vpm.pos_embed.data.device |
| dummy_image = torch.zeros( |
| (1, 3, 224, 224), |
| device=device, dtype=dtype |
| ) |
| vision_hidden_states.append(self.get_vision_embedding(dummy_image)) |
| else: |
| vision_hidden_states.append([]) |
|
|
| else: |
| vision_hidden_states = data['vision_hidden_states'] |
|
|
| vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) * self.llm.config.scale_emb |
| vision_hidden_states = [i.type(vllm_embedding.dtype) if isinstance( |
| i, torch.Tensor) else i for i in vision_hidden_states] |
|
|
| bs = len(data['input_ids']) |
| for i in range(bs): |
| cur_vs_hs = vision_hidden_states[i] |
| if len(cur_vs_hs) > 0: |
| cur_vllm_emb = vllm_embedding[i] |
| cur_image_bound = data['image_bound'][i] |
| if len(cur_image_bound) > 0: |
| image_indices = torch.stack( |
| [torch.arange(r[0], r[1], dtype=torch.long) for r in cur_image_bound] |
| ).to(vllm_embedding.device) |
|
|
| cur_vllm_emb.scatter_(0, image_indices.view(-1, 1).repeat(1, cur_vllm_emb.shape[-1]), |
| cur_vs_hs.view(-1, cur_vs_hs.shape[-1])) |
| elif self.training: |
| cur_vllm_emb += cur_vs_hs[0].mean() * 0 |
|
|
| return vllm_embedding, vision_hidden_states |
|
|
| def forward(self, data, **kwargs): |
| vllm_embedding, vision_hidden_states = self.get_vllm_embedding(data) |
| position_ids = data["position_ids"] |
| if position_ids.dtype != torch.int64: |
| position_ids = position_ids.long() |
|
|
| return self.llm( |
| input_ids=None, |
| position_ids=position_ids, |
| inputs_embeds=vllm_embedding, |
| **kwargs |
| ) |
|
|
|
|
| def _convert_to_tensors(self, tokenizer, input_str, max_inp_length: Optional[int] = None): |
| if tokenizer.add_bos_token: |
| input_ids = tokenizer.encode(input_str) |
| else: |
| input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str) |
| if max_inp_length is not None: |
| input_ids = input_ids[: max_inp_length] |
| input_ids = torch.tensor(input_ids, dtype=torch.int32) |
|
|
| image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0] |
| |
| image_start_tokens += 1 |
| image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0] |
| valid_image_nums = max(len(image_start_tokens), len(image_end_tokens)) |
| image_bound = torch.hstack( |
| [image_start_tokens[: valid_image_nums].unsqueeze(-1), |
| image_end_tokens[:valid_image_nums].unsqueeze(-1)] |
| ) |
|
|
| model_input = {} |
| model_input["input_ids"] = input_ids.unsqueeze(0).to(self.device) |
| model_input["image_bound"] = image_bound |
|
|
| return model_input |
|
|
|
|
| def _process_list(self, tokenizer, data_list: List[str], max_inp_length: Optional[int] = None): |
| pad_keys = ['input_ids'] |
| input_tensors = [] |
| for data in data_list: |
| input_tensors.append(self._convert_to_tensors(tokenizer, data, max_inp_length)) |
| padded = {} |
| for key in pad_keys: |
| padded[key] = pad(input_tensors, key, padding_side="left").to(self.device) |
| padded['image_bound'] = [i['image_bound'] for i in input_tensors] |
| return padded |
|
|
| def _decode(self, inputs_embeds, tokenizer, **kwargs): |
| output = self.llm.generate( |
| inputs_embeds=inputs_embeds, |
| pad_token_id=0, |
| eos_token_id=tokenizer.eos_token_id, |
| **kwargs |
| ) |
| return self._decode_text(output, tokenizer) |
|
|
| def _decode_text(self, result_ids, tokenizer): |
| result_text = [] |
| for result in result_ids: |
| result = result[result != 0] |
| if result[0] == tokenizer.bos_id: |
| result = result[1:] |
| if result[-1] == tokenizer.eos_id: |
| result = result[:-1] |
| result_text.append(tokenizer.decode(result).strip()) |
| return result_text |
|
|
| def generate( |
| self, |
| data_list=None, |
| img_list=None, |
| tokenizer=None, |
| max_inp_length: Optional[int] = None, |
| vision_hidden_states=None, |
| return_vision_hidden_states=False, |
| **kwargs |
| ): |
|
|
| assert data_list is not None |
| bs = len(data_list) |
| if img_list == None: |
| img_list = [[] for i in range(bs)] |
| assert bs == len(img_list) |
|
|
| model_inputs = self._process_list(tokenizer, data_list, max_inp_length) |
|
|
| if vision_hidden_states is None: |
| pixel_values = [] |
| for i in range(bs): |
| img_inps = [] |
| for img in img_list[i]: |
| img_inps.append(self.transform(img)) |
| if img_inps: |
| pixel_values.append(torch.stack(img_inps).to(self.device)) |
| else: |
| pixel_values.append([]) |
| model_inputs['pixel_values'] = pixel_values |
| else: |
| model_inputs['vision_hidden_states'] = vision_hidden_states |
|
|
| with torch.inference_mode(): |
| model_inputs['inputs_embeds'], vision_hidden_states = self.get_vllm_embedding(model_inputs) |
|
|
| result = self._decode(model_inputs['inputs_embeds'], tokenizer, **kwargs) |
|
|
| if return_vision_hidden_states: |
| return result, vision_hidden_states |
|
|
| return result |
|
|
|
|
| def chat(self, image, msgs, context, tokenizer, vision_hidden_states=None, max_new_tokens=2048, sampling=False, **kwargs): |
| if isinstance(msgs, str): |
| msgs = json.loads(msgs) |
| |
| prompt = '' |
| for i, msg in enumerate(msgs): |
| role = msg['role'] |
| content = msg['content'] |
| assert role in ['user', 'assistant'] |
| if i == 0: |
| assert role == 'user', 'The role of first msg should be user' |
| content = tokenizer.im_start + tokenizer.unk_token * self.config.query_num + tokenizer.im_end + '\n' + content |
| prompt += '<用户>' if role=='user' else '<AI>' |
| prompt += content |
| prompt += '<AI>' |
| final_input = prompt |
|
|
| if sampling: |
| generation_config = { |
| 'top_p': 0.8, |
| 'top_k': 100, |
| 'temperature':0.6, |
| 'do_sample': True |
| } |
| else: |
| generation_config = { |
| 'num_beams': 3, |
| 'repetition_penalty': 1.2, |
| } |
|
|
| generation_config.update((k, kwargs[k]) for k in generation_config.keys() & kwargs.keys()) |
|
|
| with torch.inference_mode(): |
| res, vision_hidden_states = self.generate( |
| data_list=[final_input], |
| max_inp_length=2048, |
| img_list=[[image]], |
| tokenizer=tokenizer, |
| max_new_tokens=max_new_tokens, |
| vision_hidden_states=vision_hidden_states, |
| return_vision_hidden_states=True, |
| **generation_config |
| ) |
| answer = res[0] |
| context = msgs |
| context.append({'role':'assistant', 'content': answer}) |
|
|
| return answer, context, generation_config |
|
|
|
|
| class LlamaTokenizerWrapper(LlamaTokenizer): |
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
| self.im_start = "<image>" |
| self.im_end = "</image>" |
| self.ref_start = "<ref>" |
| self.ref_end = "</ref>" |
| self.box_start = "<box>" |
| self.box_end = "</box>" |
| self.quad_start = "<quad>" |
| self.quad_end = "</quad>" |
|
|
| @property |
| def eos_id(self): |
| return self.sp_model.eos_id() |
|
|
| @property |
| def bos_id(self): |
| return self.sp_model.bos_id() |
|
|
| @property |
| def unk_id(self): |
| return self.sp_model.unk_id() |
|
|
| @property |
| def im_start_id(self): |
| return self._convert_token_to_id(self.im_start) |
|
|
| @property |
| def im_end_id(self): |
| return self._convert_token_to_id(self.im_end) |
|
|
|
|
| def pad(orig_items, key, max_length=None, padding_value=0, padding_side="left"): |
| items = [] |
| if isinstance(orig_items[0][key], list): |
| assert isinstance(orig_items[0][key][0], torch.Tensor) |
| for it in orig_items: |
| for tr in it[key]: |
| items.append({key: tr}) |
| else: |
| assert isinstance(orig_items[0][key], torch.Tensor) |
| items = orig_items |
|
|
| batch_size = len(items) |
| shape = items[0][key].shape |
| dim = len(shape) |
| assert dim <= 3 |
| if max_length is None: |
| max_length = 0 |
| max_length = max(max_length, max(item[key].shape[-1] for item in items)) |
| min_length = min(item[key].shape[-1] for item in items) |
| dtype = items[0][key].dtype |
|
|
| if dim == 1: |
| return torch.cat([item[key] for item in items], dim=0) |
| elif dim == 2: |
| if max_length == min_length: |
| return torch.cat([item[key] for item in items], dim=0) |
| tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value |
| else: |
| tensor = torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) + padding_value |
|
|
| for i, item in enumerate(items): |
| if dim == 2: |
| if padding_side == "left": |
| tensor[i, -len(item[key][0]):] = item[key][0].clone() |
| else: |
| tensor[i, : len(item[key][0])] = item[key][0].clone() |
| elif dim == 3: |
| if padding_side == "left": |
| tensor[i, -len(item[key][0]):, :] = item[key][0].clone() |
| else: |
| tensor[i, : len(item[key][0]), :] = item[key][0].clone() |
|
|
| return tensor |
|
|