| import torch |
| from transformers import AutoModelForQuestionAnswering |
| from transformers import AutoTokenizer, BertConfig |
| import onnx |
| from onnxruntime.quantization import quantize_dynamic, QuantType |
| from onnxruntime.quantization import shape_inference |
| import os |
| import logging |
| from typing import Optional, Dict, Any |
| import subprocess |
|
|
| class ONNXModelConverter: |
| def __init__(self, model_name: str, output_dir: str): |
| self.model_name = model_name |
| self.output_dir = output_dir |
| self.setup_logging() |
|
|
| os.makedirs(output_dir, exist_ok=True) |
|
|
| self.logger.info(f"Loading tokenizer {model_name}...") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
|
| self.logger.info(f"Loading model {model_name}...") |
| self.model = AutoModelForQuestionAnswering.from_pretrained( |
| model_name, |
| trust_remote_code=True, |
| torch_dtype=torch.float32 |
| ) |
| self.model.eval() |
|
|
| def setup_logging(self): |
| self.logger = logging.getLogger(__name__) |
| self.logger.setLevel(logging.INFO) |
| handler = logging.StreamHandler() |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
| handler.setFormatter(formatter) |
| self.logger.addHandler(handler) |
|
|
| def prepare_dummy_inputs(self): |
| dummy_input = self.tokenizer( |
| "Hello, how are you?", |
| return_tensors="pt", |
| padding=True, |
| truncation=True, |
| max_length=128 |
| ) |
| return { |
| 'input_ids': dummy_input['input_ids'], |
| 'attention_mask': dummy_input['attention_mask'], |
| 'token_type_ids': dummy_input['token_type_ids'] |
| } |
|
|
| def export_to_onnx(self): |
| output_path = os.path.join(self.output_dir, "model.onnx") |
| inputs = self.prepare_dummy_inputs() |
|
|
| dynamic_axes = { |
| 'input_ids': {0: 'batch_size', 1: 'sequence_length'}, |
| 'attention_mask': {0: 'batch_size', 1: 'sequence_length'}, |
| 'token_type_ids': {0: 'batch_size', 1: 'sequence_length'}, |
| 'start_logits': {0: 'batch_size', 1: 'sequence_length'}, |
| 'end_logits': {0: 'batch_size', 1: 'sequence_length'}, |
| } |
|
|
| class ModelWrapper(torch.nn.Module): |
| def __init__(self, model): |
| super().__init__() |
| self.model = model |
|
|
| def forward(self, input_ids, attention_mask, token_type_ids): |
| outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) |
| return outputs.start_logits, outputs.end_logits |
|
|
| wrapped_model = ModelWrapper(self.model) |
|
|
| try: |
| torch.onnx.export( |
| wrapped_model, |
| (inputs['input_ids'], inputs['attention_mask'], inputs['token_type_ids']), |
| output_path, |
| export_params=True, |
| opset_version=14, |
| do_constant_folding=True, |
| input_names=['input_ids', 'attention_mask', 'token_type_ids'], |
| output_names=['start_logits', 'end_logits'], |
| dynamic_axes=dynamic_axes, |
| verbose=False |
| ) |
| self.logger.info(f"Model exported to {output_path}") |
| return output_path |
| except Exception as e: |
| self.logger.error(f"ONNX export failed: {str(e)}") |
| raise |
|
|
| def verify_model(self, model_path: str): |
| try: |
| onnx_model = onnx.load(model_path) |
| onnx.checker.check_model(onnx_model) |
| self.logger.info("ONNX model verification successful") |
| return True |
| except Exception as e: |
| self.logger.error(f"Model verification failed: {str(e)}") |
| return False |
|
|
| def preprocess_model(self, model_path: str) -> str: |
| preprocessed_path = os.path.join(self.output_dir, "model-infer.onnx") |
| try: |
| command = [ |
| "python", "-m", "onnxruntime.quantization.preprocess", |
| "--input", model_path, |
| "--output", preprocessed_path |
| ] |
| result = subprocess.run(command, check=True, capture_output=True, text=True) |
| if result.returncode == 0: |
| self.logger.info(f"Model preprocessing successful. Output saved to {preprocessed_path}") |
| return preprocessed_path |
| else: |
| raise subprocess.CalledProcessError(result.returncode, command, result.stdout, result.stderr) |
| except subprocess.CalledProcessError as e: |
| self.logger.error(f"Preprocessing failed: {e.stderr}") |
| raise |
| except Exception as e: |
| self.logger.error(f"Preprocessing failed: {str(e)}") |
| raise |
|
|
| def quantize_model(self, model_path: str): |
| weight_types = {'int4':QuantType.QInt4, 'int8':QuantType.QInt8, 'uint4':QuantType.QUInt4, 'uint8':QuantType.QUInt8, 'uint16':QuantType.QUInt16, 'int16':QuantType.QInt16} |
| all_quantized_paths = [] |
| for weight_type in weight_types.keys(): |
| quantized_path = os.path.join(self.output_dir, "model_" + weight_type + ".onnx") |
|
|
| try: |
| quantize_dynamic( |
| model_path, |
| quantized_path, |
| weight_type=weight_types[weight_type] |
| ) |
| self.logger.info(f"Model quantized ({weight_type}) and saved to {quantized_path}") |
| all_quantized_paths.append(quantized_path) |
| except Exception as e: |
| self.logger.error(f"Quantization ({weight_type}) failed: {str(e)}") |
| raise |
|
|
| return all_quantized_paths |
|
|
|
|
| def convert(self): |
| try: |
| onnx_path = self.export_to_onnx() |
|
|
| if self.verify_model(onnx_path): |
| |
| |
|
|
| |
| quantized_paths = self.quantize_model(onnx_path) |
|
|
| tokenizer_path = os.path.join(self.output_dir, "tokenizer") |
| self.tokenizer.save_pretrained(tokenizer_path) |
| self.logger.info(f"Tokenizer saved to {tokenizer_path}") |
|
|
| return { |
| 'onnx_model': onnx_path, |
| 'quantized_models': quantized_paths, |
| 'tokenizer': tokenizer_path |
| } |
| else: |
| raise Exception("Model verification failed") |
|
|
| except Exception as e: |
| self.logger.error(f"Conversion process failed: {str(e)}") |
| raise |
|
|
| if __name__ == "__main__": |
| MODEL_NAME = "Intel/dynamic_tinybert" |
| OUTPUT_DIR = "onnx" |
|
|
| try: |
| converter = ONNXModelConverter(MODEL_NAME, OUTPUT_DIR) |
| results = converter.convert() |
|
|
| print("\nConversion completed successfully!") |
| print(f"ONNX model path: {results['onnx_model']}") |
| print(f"Quantized model paths: {results['quantized_models']}") |
| print(f"Tokenizer path: {results['tokenizer']}") |
|
|
| except Exception as e: |
| print(f"Conversion failed: {str(e)}") |
|
|