| import torch.nn as nn |
| from transformers import BertPreTrainedModel, BertModel, AutoTokenizer |
| from huggingface_hub import hf_hub_download |
| import torch |
| from tqdm import tqdm |
| from .colbert_configuration import ColBERTConfig |
| from .tokenization_utils import QueryTokenizer, DocTokenizer |
| import os |
|
|
|
|
| class NullContextManager(object): |
| def __init__(self, dummy_resource=None): |
| self.dummy_resource = dummy_resource |
| def __enter__(self): |
| return self.dummy_resource |
| def __exit__(self, *args): |
| pass |
|
|
| class MixedPrecisionManager(): |
| def __init__(self, activated): |
| self.activated = activated |
|
|
| if self.activated: |
| self.scaler = torch.amp.GradScaler("cuda") |
|
|
| def context(self): |
| return torch.amp.autocast("cuda") if self.activated else NullContextManager() |
|
|
| def backward(self, loss): |
| if self.activated: |
| self.scaler.scale(loss).backward() |
| else: |
| loss.backward() |
|
|
| def step(self, colbert, optimizer, scheduler=None): |
| if self.activated: |
| self.scaler.unscale_(optimizer) |
| torch.nn.utils.clip_grad_norm_(colbert.parameters(), 2.0, error_if_nonfinite=False) |
|
|
| self.scaler.step(optimizer) |
| self.scaler.update() |
| else: |
| torch.nn.utils.clip_grad_norm_(colbert.parameters(), 2.0) |
| optimizer.step() |
| |
| if scheduler is not None: |
| scheduler.step() |
|
|
| optimizer.zero_grad() |
|
|
| class ConstBERT(BertPreTrainedModel): |
| """ |
| Shallow wrapper around HuggingFace transformers. All new parameters should be defined at this level. |
| |
| This makes sure `{from,save}_pretrained` and `init_weights` are applied to new parameters correctly. |
| """ |
| _keys_to_ignore_on_load_unexpected = [r"cls"] |
|
|
| def __init__(self, config, colbert_config, verbose:int = 0): |
| super().__init__(config) |
|
|
| self.config = config |
| self.dim = colbert_config.dim |
| self.linear = nn.Linear(config.hidden_size, colbert_config.dim, bias=False) |
| self.doc_project = nn.Linear(colbert_config.doc_maxlen, 32, bias=False) |
| self.query_project = nn.Linear(colbert_config.query_maxlen, 64, bias=False) |
|
|
| |
| if not os.path.exists(os.path.join(colbert_config.name_or_path, "tokenizer.json")): |
| hf_hub_download(repo_id=colbert_config.name_or_path, filename="tokenizer.json") |
| if not os.path.exists(os.path.join(colbert_config.name_or_path, "vocab.txt")): |
| hf_hub_download(repo_id=colbert_config.name_or_path, filename="vocab.txt") |
| if not os.path.exists(os.path.join(colbert_config.name_or_path, "tokenizer_config.json")): |
| hf_hub_download(repo_id=colbert_config.name_or_path, filename="tokenizer_config.json") |
| if not os.path.exists(os.path.join(colbert_config.name_or_path, "special_tokens_map.json")): |
| hf_hub_download(repo_id=colbert_config.name_or_path, filename="special_tokens_map.json") |
|
|
| self.query_tokenizer = QueryTokenizer(colbert_config, verbose=verbose) |
| self.doc_tokenizer = DocTokenizer(colbert_config) |
| self.amp_manager = MixedPrecisionManager(True) |
|
|
| self.raw_tokenizer = AutoTokenizer.from_pretrained(colbert_config.checkpoint) |
| self.pad_token = self.raw_tokenizer.pad_token_id |
| self.use_gpu = colbert_config.total_visible_gpus > 0 |
|
|
| setattr(self,self.base_model_prefix, BertModel(config)) |
|
|
| |
| |
|
|
| self.init_weights() |
|
|
| |
| |
| |
|
|
| @property |
| def LM(self): |
| base_model_prefix = getattr(self, "base_model_prefix") |
| return getattr(self, base_model_prefix) |
|
|
|
|
| @classmethod |
| def from_pretrained(cls, name_or_path, config=None, *args, **kwargs): |
| colbert_config = ColBERTConfig(name_or_path) |
| colbert_config = ColBERTConfig.from_existing(ColBERTConfig.load_from_checkpoint(name_or_path), colbert_config) |
| obj = super().from_pretrained(name_or_path, colbert_config=colbert_config, config=config) |
| obj.base = name_or_path |
|
|
| return obj |
|
|
| @staticmethod |
| def raw_tokenizer_from_pretrained(name_or_path): |
| obj = AutoTokenizer.from_pretrained(name_or_path) |
| obj.base = name_or_path |
|
|
| return obj |
| |
|
|
| def _query(self, input_ids, attention_mask): |
| input_ids, attention_mask = input_ids.to(self.device), attention_mask.to(self.device) |
| Q = self.bert(input_ids, attention_mask=attention_mask)[0] |
| |
| |
| |
| Q = self.linear(Q) |
| |
|
|
| mask = torch.tensor(self.mask(input_ids, skiplist=[]), device=self.device).unsqueeze(2).float() |
| Q = Q * mask |
|
|
| return torch.nn.functional.normalize(Q, p=2, dim=2) |
|
|
| def _doc(self, input_ids, attention_mask, keep_dims=True): |
| assert keep_dims in [True, False, 'return_mask'] |
|
|
| input_ids, attention_mask = input_ids.to(self.device), attention_mask.to(self.device) |
| D = self.bert(input_ids, attention_mask=attention_mask)[0] |
| D = D.permute(0, 2, 1) |
| D = self.doc_project(D) |
| D = D.permute(0, 2, 1) |
| D = self.linear(D) |
| mask = torch.ones(D.shape[0], D.shape[1], device=self.device).unsqueeze(2).float() |
|
|
| |
| D = D * mask |
| D = torch.nn.functional.normalize(D, p=2, dim=2) |
| if self.use_gpu: |
| D = D.half() |
|
|
| if keep_dims is False: |
| D, mask = D.cpu(), mask.bool().cpu().squeeze(-1) |
| D = [d[mask[idx]] for idx, d in enumerate(D)] |
|
|
| elif keep_dims == 'return_mask': |
| return D, mask.bool() |
|
|
| return D |
|
|
| def mask(self, input_ids, skiplist): |
| mask = [[(x not in skiplist) and (x != self.pad_token) for x in d] for d in input_ids.cpu().tolist()] |
| return mask |
|
|
| def query(self, *args, to_cpu=False, **kw_args): |
| with torch.no_grad(): |
| with self.amp_manager.context(): |
| Q = self._query(*args, **kw_args) |
| return Q.cpu() if to_cpu else Q |
|
|
| def doc(self, *args, to_cpu=False, **kw_args): |
| with torch.no_grad(): |
| with self.amp_manager.context(): |
| D = self._doc(*args, **kw_args) |
|
|
| if to_cpu: |
| return (D[0].cpu(), *D[1:]) if isinstance(D, tuple) else D.cpu() |
|
|
| return D |
|
|
| def encode_queries(self, queries, bsize=None, to_cpu=False, context=None, full_length_search=False): |
| if type(queries) == str: |
| queries = [queries] |
| if bsize: |
| batches = self.query_tokenizer.tensorize(queries, context=context, bsize=bsize, full_length_search=full_length_search) |
| batches = [self.query(input_ids, attention_mask, to_cpu=to_cpu) for input_ids, attention_mask in batches] |
| return torch.cat(batches) |
|
|
| input_ids, attention_mask = self.query_tokenizer.tensorize(queries, context=context, full_length_search=full_length_search) |
| return self.query(input_ids, attention_mask) |
|
|
| def encode_documents(self, docs, bsize=None, keep_dims=True, to_cpu=False, showprogress=False, return_tokens=False): |
| if type(docs) == str: |
| docs = [docs] |
| assert keep_dims in [True, False, 'flatten'] |
|
|
| if bsize: |
| text_batches, reverse_indices = self.doc_tokenizer.tensorize(docs, bsize=bsize) |
|
|
| returned_text = [] |
| if return_tokens: |
| returned_text = [text for batch in text_batches for text in batch[0]] |
| returned_text = [returned_text[idx] for idx in reverse_indices.tolist()] |
| returned_text = [returned_text] |
|
|
| keep_dims_ = 'return_mask' if keep_dims == 'flatten' else keep_dims |
| batches = [self.doc(input_ids, attention_mask, keep_dims=keep_dims_, to_cpu=to_cpu) |
| for input_ids, attention_mask in tqdm(text_batches, disable=not showprogress)] |
|
|
| if keep_dims is True: |
| D = _stack_3D_tensors(batches) |
| return (D[reverse_indices], *returned_text) |
|
|
| elif keep_dims == 'flatten': |
| D, mask = [], [] |
|
|
| for D_, mask_ in batches: |
| D.append(D_) |
| mask.append(mask_) |
|
|
| D, mask = torch.cat(D)[reverse_indices], torch.cat(mask)[reverse_indices] |
|
|
| doclens = mask.squeeze(-1).sum(-1).tolist() |
|
|
| D = D.view(-1, self.colbert_config.dim) |
| D = D[mask.bool().flatten()].cpu() |
|
|
| return (D, doclens, *returned_text) |
|
|
| assert keep_dims is False |
|
|
| D = [d for batch in batches for d in batch] |
| return ([D[idx] for idx in reverse_indices.tolist()], *returned_text) |
|
|
| input_ids, attention_mask = self.doc_tokenizer.tensorize(docs) |
| return self.doc(input_ids, attention_mask, keep_dims=keep_dims, to_cpu=to_cpu) |
|
|
| def _stack_3D_tensors(groups): |
| bsize = sum([x.size(0) for x in groups]) |
| maxlen = max([x.size(1) for x in groups]) |
| hdim = groups[0].size(2) |
|
|
| output = torch.zeros(bsize, maxlen, hdim, device=groups[0].device, dtype=groups[0].dtype) |
|
|
| offset = 0 |
| for x in groups: |
| endpos = offset + x.size(0) |
| output[offset:endpos, :x.size(1)] = x |
| offset = endpos |
|
|
| return output |