| |
| |
|
|
| import os |
| import re |
| from typing import Optional, Set |
| from rknn.api import RKNN |
| from math import exp |
| from sys import exit |
| import argparse |
| import onnxscript |
| from onnxscript.rewriter import pattern |
| import onnx.numpy_helper as onh |
| import numpy as np |
| import onnx |
| import onnxruntime as ort |
| from rknn.utils import onnx_edit |
|
|
| os.chdir(os.path.dirname(os.path.abspath(__file__))) |
|
|
| speech_length = 171 |
|
|
| def _remove_file(path: str, *, keep: Optional[Set[str]] = None) -> None: |
| if not path: |
| return |
| keep_paths: Set[str] = {os.path.abspath(item) for item in keep} if keep else set() |
| normalized = os.path.abspath(path) |
| if keep_paths and normalized in keep_paths: |
| return |
| if not os.path.exists(normalized): |
| return |
| try: |
| os.remove(normalized) |
| print(f'cleaned temp model: {normalized}') |
| except OSError as err: |
| print(f'warning: failed to remove {normalized}: {err}') |
|
|
| def _with_suffix(path: str, suffix: str) -> str: |
| stem, ext = os.path.splitext(path) |
| return f"{stem}{suffix}{ext}" |
|
|
| def _sanitize_name(name: str) -> str: |
| return re.sub(r'[^0-9A-Za-z_]', '_', name) |
|
|
| def _insert_div_node(model: onnx.ModelProto, tensor_name: str, divisor: float = 16.0) -> bool: |
| graph = model.graph |
|
|
| for node in graph.node: |
| if node.op_type == 'Div' and tensor_name in node.output: |
| return False |
|
|
| producer_index = None |
| output_index = None |
| for idx, node in enumerate(graph.node): |
| for out_idx, output in enumerate(node.output): |
| if output == tensor_name: |
| producer_index = idx |
| output_index = out_idx |
| producer_node = node |
| break |
| if producer_index is not None: |
| break |
|
|
| if producer_index is None: |
| raise RuntimeError(f"Producer node for tensor {tensor_name} not found.") |
|
|
| pre_div_output = f"{tensor_name}_pre_div" |
| producer_node.output[output_index] = pre_div_output |
|
|
| sanitized = _sanitize_name(tensor_name) |
| const_output = f"{sanitized}_div_const" |
| const_node_name = f"{sanitized}_DivConst" |
| div_node_name = f"{sanitized}_Div" |
|
|
| const_tensor = onnx.helper.make_tensor( |
| name=f"{const_node_name}_value", |
| data_type=onnx.TensorProto.FLOAT, |
| dims=[], |
| vals=[divisor], |
| ) |
|
|
| const_node = onnx.helper.make_node( |
| 'Constant', |
| inputs=[], |
| outputs=[const_output], |
| value=const_tensor, |
| name=const_node_name, |
| ) |
|
|
| div_node = onnx.helper.make_node( |
| 'Div', |
| inputs=[pre_div_output, const_output], |
| outputs=[tensor_name], |
| name=div_node_name, |
| ) |
|
|
| graph.node.insert(producer_index + 1, const_node) |
| graph.node.insert(producer_index + 2, div_node) |
| return True |
|
|
| def _scale_initializer(model: onnx.ModelProto, initializer_name: str, divisor: float = 16.0) -> bool: |
| for idx, initializer in enumerate(model.graph.initializer): |
| if initializer.name == initializer_name: |
| data = onh.to_array(initializer).astype(np.float32, copy=False) |
| scaled = data / divisor |
| model.graph.initializer[idx].CopyFrom(onh.from_array(scaled, name=initializer_name)) |
| return True |
| return False |
|
|
| def convert_encoder(model_path: str): |
| rknn = RKNN(verbose=True) |
|
|
| ONNX_MODEL = os.path.abspath(model_path) |
| if not os.path.isfile(ONNX_MODEL): |
| print(f'Model file not found: {model_path}') |
| exit(1) |
| if not ONNX_MODEL.lower().endswith('.onnx'): |
| print(f'Model file must be an ONNX file: {model_path}') |
| exit(1) |
|
|
| RKNN_MODEL = os.path.splitext(ONNX_MODEL)[0] + ".rknn" |
| DATASET = "dataset.txt" |
| QUANTIZE = False |
| original_model = ONNX_MODEL |
| preserve_files: Set[str] = {original_model} |
|
|
|
|
| print('--> Patching model to avoid overflow issue') |
| base_model = onnx.load(ONNX_MODEL) |
| modified = False |
| for layer_idx in range(48, 49): |
| for target in [ |
| f'/encoders.{layer_idx}/feed_forward/activation/Relu_output_0', |
| f'/encoders.{layer_idx}/norm2/Cast_output_0', |
| ]: |
| modified |= _insert_div_node(base_model, target, divisor=2.0) |
| bias_scaled = False |
| if modified: |
| for layer_idx in range(48, 49): |
| bias_scaled |= _scale_initializer(base_model, f'model.encoders.{layer_idx}.feed_forward.w_2.bias', divisor=2.0) |
| div_model_path = _with_suffix(ONNX_MODEL, "_div") |
| onnx.save(base_model, div_model_path) |
| if os.path.exists(div_model_path): |
| previous_model = ONNX_MODEL |
| ONNX_MODEL = div_model_path |
| _remove_file(previous_model, keep=preserve_files) |
| if modified: |
| if bias_scaled: |
| print('done (created div-adjusted model and scaled bias)') |
| else: |
| print('done (created div-adjusted model; bias initializer not found)') |
| else: |
| print('done (div nodes already present)') |
| |
| |
| |
| extract_model_path = os.path.join(os.getcwd(), "extract_model.onnx") |
| onnx.utils.extract_model(ONNX_MODEL, extract_model_path, ['speech_lengths'], ['/make_pad_mask/Cast_2_output_0']) |
| sess = ort.InferenceSession(extract_model_path, providers=['CPUExecutionProvider']) |
| extract_result = sess.run(None, {"speech_lengths": np.array([speech_length], dtype=np.int64)})[0] |
| _remove_file(extract_model_path) |
|
|
| |
| edited_model_path = _with_suffix(ONNX_MODEL, "_edited") |
| ret = onnx_edit(model = ONNX_MODEL, |
| export_path = edited_model_path, |
| |
| outputs_transform = {'encoder_out': 'a,b,c->a,c,1,b'}, |
| |
| ) |
| if os.path.exists(edited_model_path): |
| previous_model = ONNX_MODEL |
| ONNX_MODEL = edited_model_path |
| _remove_file(previous_model, keep=preserve_files) |
|
|
| |
| print('--> Config model') |
| rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3) |
| print('done') |
|
|
| |
| print("--> Loading model") |
| current_model_path = ONNX_MODEL |
| ret = rknn.load_onnx( |
| model=current_model_path, |
| inputs=["speech", "/make_pad_mask/Cast_2_output_0"], |
| input_size_list=[[1, speech_length, 560], [extract_result.shape[0], extract_result.shape[1]]], |
| input_initial_val=[None, extract_result], |
| |
| ) |
|
|
| if ret != 0: |
| print('Load model failed!') |
| exit(ret) |
| print('done') |
| _remove_file(current_model_path, keep=preserve_files) |
|
|
| |
| print('--> Building model') |
| ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) |
| if ret != 0: |
| print('Build model failed!') |
| exit(ret) |
| print('done') |
|
|
| |
| print('--> Export RKNN model') |
| ret = rknn.export_rknn(RKNN_MODEL) |
| if ret != 0: |
| print('Export RKNN model failed!') |
| exit(ret) |
| print('done') |
| |
| |
|
|
| |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("model_path", type=str, help="path to source ONNX model") |
| args = parser.parse_args() |
|
|
| convert_encoder(args.model_path) |
|
|