| from .huggingface_utils import get_auth_token |
| from .onnx_models_structure import ( |
| T5Encoder, |
| DecoderWithLMhead, |
| DecoderWithLMheadInitial, |
| ) |
| from transformers import ( |
| AutoConfig, |
| T5ForConditionalGeneration, |
| MT5ForConditionalGeneration, |
| ) |
| import torch |
| import functools |
| import operator |
| from progress.bar import Bar |
| from pathlib import Path |
| import os |
|
|
| _folder = Path.cwd() |
| saved_models_path = _folder.joinpath("models") |
|
|
| Bar.check_tty = False |
|
|
|
|
| def create_t5_encoder_decoder(pretrained_version="t5-base"): |
| """Generates an encoder and a decoder model with a language model head from a pretrained huggingface model |
| |
| Args: |
| pretrained_version (str): Name of a pretrained model, or path to a pretrained / finetuned version of T5 |
| |
| Returns: |
| simplified_encoder: pytorch t5 encoder with a wrapper to output only the hidden states |
| decoder_with_lm_head: pytorch t5 decoder with a language modeling head |
| """ |
|
|
| if 'mt5' in pretrained_version: |
| model = MT5ForConditionalGeneration.from_pretrained(pretrained_version, use_auth_token=get_auth_token()) |
| else: |
| model = T5ForConditionalGeneration.from_pretrained(pretrained_version, use_auth_token=get_auth_token()) |
|
|
| return turn_model_into_encoder_decoder(model) |
|
|
|
|
| def turn_model_into_encoder_decoder(model): |
| encoder = model.encoder |
| decoder = model.decoder |
| lm_head = model.lm_head |
|
|
| decoder_with_lm_head = DecoderWithLMhead(decoder, lm_head, model.config) |
| simplified_encoder = T5Encoder(encoder) |
| decoder_with_lm_head_init = DecoderWithLMheadInitial(decoder, lm_head, model.config) |
|
|
| return simplified_encoder, decoder_with_lm_head, decoder_with_lm_head_init |
|
|
|
|
| def generate_onnx_representation( |
| pretrained_version=None, |
| model=None, |
| output_path=None, |
| input_sequence_length=256, |
| onnx_opset_version=12, |
| ): |
| """Exports a given huggingface pretrained model, or a given model and tokenizer, to onnx |
| |
| Args: |
| pretrained_version (str): Name of a pretrained model, or path to a pretrained / finetuned version of T5 |
| output_path (Optional[str]): if missing then use ./models |
| input_sequence_length (Optional[int]): typical input sequence length, for use by the ORT for possible optimization |
| onnx_opset_version (Optional[int]): ONNX Operator Set Version, default 12 is the only tested version |
| """ |
| if (pretrained_version is None) and model is None: |
| print( |
| "You need to specify pretrained_version (the pretrained model you wish to export). Alternatively you can export a model you have in memory." |
| ) |
| return |
|
|
| if model is not None: |
| ( |
| simplified_encoder, |
| decoder_with_lm_head, |
| decoder_with_lm_head_init, |
| ) = turn_model_into_encoder_decoder(model) |
| else: |
| ( |
| simplified_encoder, |
| decoder_with_lm_head, |
| decoder_with_lm_head_init, |
| ) = create_t5_encoder_decoder(pretrained_version) |
|
|
| |
| output_path = saved_models_path if output_path is None else Path(output_path) |
| encoder_path, decoder_path, init_decoder_path = get_model_paths( |
| pretrained_version, output_path, quantized=False |
| ) |
|
|
| model_config = AutoConfig.from_pretrained(pretrained_version, use_auth_token=get_auth_token()) |
|
|
| |
| |
| batch_size = 1 |
| enc_seq_length = input_sequence_length |
| dec_seq_length = 1 |
| input_ids = torch.ones(batch_size, enc_seq_length, dtype=torch.int64) |
| attention_mask = torch.ones(batch_size, enc_seq_length, dtype=torch.int64) |
|
|
| n_heads = model_config.num_heads |
| d_kv = model_config.d_kv |
|
|
| input_ids_dec = torch.ones(batch_size, dec_seq_length, dtype=torch.int64) |
| attention_mask_dec = torch.ones(batch_size, dec_seq_length, dtype=torch.int64) |
| enc_out = torch.ones( |
| (batch_size, enc_seq_length, model_config.d_model), dtype=torch.float32 |
| ) |
|
|
| |
| |
| |
| |
|
|
| sa = torch.ones( |
| (batch_size, n_heads, dec_seq_length, d_kv), dtype=torch.float32 |
| ) |
| ca = torch.ones( |
| (batch_size, n_heads, enc_seq_length, d_kv), dtype=torch.float32 |
| ) |
| t5_block = (sa, sa, ca, ca) |
| past_key_values = (t5_block,) * model_config.num_decoder_layers |
|
|
| flat_past_key_values = functools.reduce(operator.iconcat, past_key_values, []) |
|
|
| decoder_all_inputs = tuple( |
| [input_ids_dec, attention_mask_dec, enc_out] + flat_past_key_values |
| ) |
|
|
| |
| bar = Bar("Exporting to onnx...", max=3) |
|
|
| import warnings |
|
|
| |
| warnings.filterwarnings("ignore") |
|
|
| |
| with torch.no_grad(): |
|
|
| decoder_inputs = [ |
| "input_ids", |
| "encoder_attention_mask", |
| "encoder_hidden_states", |
| ] |
|
|
| pkv_input_names = ["pkv_{}".format(i) for i in range(len(flat_past_key_values))] |
|
|
| decoder_input_names = decoder_inputs + pkv_input_names |
|
|
| decoder_output_names = ["logits", "output_past_key_values"] |
|
|
| dyn_axis_general = {0: "batch", 1: "sequence"} |
| dyn_axis_pkv = {0: "batch", 2: "seq_length"} |
|
|
| dyn_axis = { |
| "input_ids": dyn_axis_general, |
| "encoder_attention_mask": dyn_axis_general, |
| "encoder_hidden_states": dyn_axis_general, |
| "logits": dyn_axis_general, |
| "output_past_key_values": dyn_axis_general, |
| } |
|
|
| dyn_pkv = { |
| "pkv_{}".format(i): dyn_axis_pkv |
| for i in range(len(flat_past_key_values)) |
| } |
|
|
| dyn_axis_params = {**dyn_axis, **dyn_pkv} |
|
|
| |
| torch.onnx.export( |
| decoder_with_lm_head, |
| decoder_all_inputs, |
| decoder_path.as_posix(), |
| export_params=True, |
| do_constant_folding=True, |
| opset_version=onnx_opset_version, |
| input_names=decoder_input_names, |
| output_names=decoder_output_names, |
| dynamic_axes=dyn_axis_params, |
| ) |
| bar.next() |
|
|
| torch.onnx.export( |
| simplified_encoder, |
| args=(input_ids, attention_mask), |
| f=encoder_path.as_posix(), |
| export_params=True, |
| opset_version=onnx_opset_version, |
| do_constant_folding=True, |
| input_names=["input_ids", "attention_mask"], |
| output_names=["hidden_states"], |
| dynamic_axes={ |
| "input_ids": dyn_axis_general, |
| "attention_mask": dyn_axis_general, |
| "hidden_states": dyn_axis_general, |
| }, |
| ) |
| bar.next() |
| |
| torch.onnx.export( |
| decoder_with_lm_head_init, |
| (input_ids_dec, attention_mask_dec, enc_out), |
| init_decoder_path.as_posix(), |
| export_params=True, |
| opset_version=onnx_opset_version, |
| input_names=[ |
| "input_ids", |
| "encoder_attention_mask", |
| "encoder_hidden_states", |
| ], |
| output_names=["logits", "past_key_values"], |
| dynamic_axes={ |
| |
| "input_ids": dyn_axis_general, |
| "encoder_attention_mask": dyn_axis_general, |
| "encoder_hidden_states": dyn_axis_general, |
| "logits": dyn_axis_general, |
| "past_key_values": dyn_axis_general, |
| }, |
| ) |
| bar.next() |
| bar.finish() |
|
|
| return encoder_path, decoder_path, init_decoder_path |
|
|
|
|
| def get_model_paths(pretrained_model, model_path, quantized): |
|
|
| model_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| pretrained_model_name = Path(pretrained_model).stem |
|
|
| if not quantized: |
| encoder_path = model_path.joinpath(f"{pretrained_model_name}-encoder.onnx") |
| decoder_path = model_path.joinpath(f"{pretrained_model_name}-decoder.onnx") |
| init_decoder_path = model_path.joinpath( |
| f"{pretrained_model_name}-init-decoder.onnx" |
| ) |
| else: |
| encoder_path = model_path.joinpath( |
| f"{pretrained_model_name}-encoder-quantized.onnx" |
| ) |
| decoder_path = model_path.joinpath( |
| f"{pretrained_model_name}-decoder-quantized.onnx" |
| ) |
| init_decoder_path = model_path.joinpath( |
| f"{pretrained_model_name}-init-decoder-quantized.onnx" |
| ) |
|
|
| return encoder_path, decoder_path, init_decoder_path |
|
|
|
|
| def quantize(models_name_or_path): |
| """ |
| Quantize the weights of the model from float32 to in8 to allow very efficient inference on modern CPU |
| |
| Uses unsigned ints for activation values, signed ints for weights, per |
| https://onnxruntime.ai/docs/performance/quantization.html#data-type-selection |
| it is faster on most CPU architectures |
| Args: |
| onnx_model_path: Path to location the exported ONNX model is stored |
| Returns: The Path generated for the quantized |
| """ |
| from onnxruntime.quantization import quantize_dynamic, QuantType |
|
|
| bar = Bar("Quantizing...", max=3) |
|
|
| quant_model_paths = [] |
| for model in models_name_or_path: |
| model_name = model.as_posix() |
| output_model_name = f"{model_name[:-5]}-quantized.onnx" |
| quantize_dynamic( |
| model_input=model_name, |
| model_output=output_model_name, |
| per_channel=True, |
| reduce_range=True, |
| activation_type=QuantType.QUInt8, |
| weight_type=QuantType.QInt8, |
| optimize_model=False, |
| ) |
| quant_model_paths.append(output_model_name) |
| bar.next() |
|
|
| bar.finish() |
|
|
| return tuple(quant_model_paths) |
|
|