# Copyright 2024 Arm Limited and/or its affiliates. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. import json import logging import os import re import shutil import subprocess import tempfile from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np import torch from executorch.backends.arm.test.common import arm_test_options, is_option_enabled from torch.export import ExportedProgram from torch.fx.node import Node logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) class QuantizationParams: __slots__ = ["node_name", "zp", "scale", "qmin", "qmax", "dtype"] # todo: zps and scales can be per tensors or per channel => a list?? def __init__( self, node_name: str, zp: int, scale: float, qmin: int, qmax: int, dtype: torch.dtype, ): self.node_name = node_name # not need I think, but good for error check self.zp = zp self.scale = scale self.qmin = qmin self.qmax = qmax self.dtype = dtype def _get_input_names(program: ExportedProgram) -> list[str]: """ Get a list[str] with the names of the inputs to this model. Args: program (ExportedProgram): The program to get input names from. Returns: A list of strings with the names of the model input. """ input_names = [] # E.g. bias and weights are 'placeholders' as well. This is used to # get only the use inputs. usr_inputs = program.graph_signature.user_inputs for node in program.graph.nodes: if node.op == "placeholder" and node.name in usr_inputs: input_names.append(node.name) return input_names def _get_input_quantization_params( program: ExportedProgram, ) -> list[QuantizationParams]: """ Get input QuantizationParams in a program, maximum one per input to the program. Args: program (ExportedProgram): The program to get input quantization parameters from. Returns: list[QuantizationParams]: The found quantization parameters. Raises: RuntimeError if no quantization parameters are found. """ quant_params = [] input_names = _get_input_names(program) num_inputs = len(input_names) for node in program.graph.nodes: if ( node.target == torch.ops.quantized_decomposed.quantize_per_tensor.default and node.args[0].name in input_names ): qp = QuantizationParams( node_name=node.args[0].name, scale=node.args[1], zp=node.args[2], qmin=node.args[3], qmax=node.args[4], dtype=node.args[5], ) quant_params.append(qp) if ( len(quant_params) == num_inputs ): # break early if we have all the inputs quantized parameters break if len(quant_params) == 0: raise RuntimeError("No Quantization parameters found in exported model.") return quant_params def _get_output_node(program: ExportedProgram) -> Node: """ Get output node to this model. Args: program (ExportedProgram): The program to get output node from. Returns: The node that is the output of 'program'. """ for node in program.graph.nodes: if node.op == "output": return node raise RuntimeError("No output node found.") def _get_output_quantization_params( program: ExportedProgram, output_node: Node ) -> QuantizationParams: """ Get output QuantizationParams from a program. Args: program (ExportedProgram): The program to get output quantization parameters from. Returns: QuantizationParams: The found quantization parameters. Raises: RuntimeError if no output quantization parameters are found. """ quant_params = None for node in program.graph.nodes: if ( node.target == torch.ops.quantized_decomposed.dequantize_per_tensor.default and node == output_node.args[0][0] ): quant_params = QuantizationParams( node_name=node.args[0].name, scale=node.args[1], zp=node.args[2], qmin=node.args[3], qmax=node.args[4], dtype=node.args[5], ) break # break early, there's only one output node if quant_params is None: raise RuntimeError("No Quantization parameters not found in exported model.") return quant_params """ A class to store parameters needed for running programs, either in tosa or .pte format. """ class RunnerUtil: def __init__( self, intermediate_path: str, tosa_ref_model_path: Optional[str] = None, ): self.intermediate_path = intermediate_path self.tosa_ref_model_path = tosa_ref_model_path or "tosa_reference_model" assert os.path.exists( self.intermediate_path ), f"TOSA artifact path don't exist! Path: {self.intermediate_path}" self.is_quantized: bool = False self.input_names: list[str] = None self.output_name: str = None self.qp_input: list[QuantizationParams] = None self.qp_output: QuantizationParams = None self.timeout = 120 self.target_board: str = None self._has_init_run = False def init_run( self, exported_program: ExportedProgram, edge_program: ExportedProgram, is_quantized: bool, target_board: str, ): if target_board not in ["corstone-300", "corstone-320"]: raise RuntimeError(f"Unknown target board: {target_board}") self.input_names = _get_input_names(edge_program) self.output_node = _get_output_node(exported_program) self.output_name = self.output_node.name self.is_quantized = is_quantized self.target_board = target_board if is_quantized: self.qp_input = _get_input_quantization_params(exported_program) self.qp_output = _get_output_quantization_params( exported_program, self.output_node ) else: self.qp_input = [None] * len(self.input_names) self.qp_output = None self._has_init_run = True def set_timeout(self, timeout: int): self.timeout = timeout def run_corstone( self, inputs: Tuple[torch.Tensor], ) -> list[torch.Tensor]: assert ( self._has_init_run ), "RunnerUtil needs to be initialized using init_run() before running Corstone300." pte_path = os.path.join(self.intermediate_path, "program.pte") assert os.path.exists(pte_path), f"Pte path '{pte_path}' not found." for input_name, quant_param, data in zip( self.input_names, self.qp_input, inputs ): save_bytes(self.intermediate_path, data, False, input_name, quant_param) out_path = os.path.join(self.intermediate_path, "out") out_path_with_suffix = out_path + "-0.bin" input_paths = [] for name in self.input_names: input_paths.append( os.path.join(self.intermediate_path, f"{name}.bin"), ) elf_path = os.path.join( "cmake-out", f"arm_semihosting_executor_runner_{self.target_board}", "arm_executor_runner", ) assert os.path.exists( elf_path ), f"Did not find build arm_executor_runner in path {elf_path}, run setup_testing.sh?" cmd_line = f"executor_runner -m {pte_path} -o {out_path}" for input_path in input_paths: cmd_line += f" -i {input_path}" ethos_u_extra_args = "" if is_option_enabled(arm_test_options.fast_fvp): ethos_u_extra_args = ethos_u_extra_args + "--fast" command_args = { "corstone-300": [ "FVP_Corstone_SSE-300_Ethos-U55", "-C", "ethosu.num_macs=128", "-C", "mps3_board.visualisation.disable-visualisation=1", "-C", "mps3_board.telnetterminal0.start_telnet=0", "-C", "mps3_board.uart0.out_file='-'", "-C", "cpu0.CFGITCMSZ=11", "-C", "cpu0.semihosting-enable=1", "-C", "cpu0.semihosting-stack_base=0", "-C", f"ethosu.extra_args='{ethos_u_extra_args}'", "-C", "cpu0.semihosting-heap_limit=0", "-C", f"cpu0.semihosting-cmd_line='{cmd_line}'", "-a", elf_path, "--timelimit", f"{self.timeout}", ], "corstone-320": [ "FVP_Corstone_SSE-320", "-C", "mps4_board.subsystem.ethosu.num_macs=128", "-C", "mps4_board.visualisation.disable-visualisation=1", "-C", "vis_hdlcd.disable_visualisation=1", "-C", "mps4_board.telnetterminal0.start_telnet=0", "-C", "mps4_board.uart0.out_file='-'", "-C", "mps4_board.uart0.unbuffered_output=1", "-C", "mps4_board.uart0.shutdown_on_eot=1", "-C", "mps4_board.subsystem.cpu0.semihosting-enable=1", "-C", "mps4_board.subsystem.cpu0.semihosting-stack_base=0", "-C", "mps4_board.subsystem.cpu0.semihosting-heap_limit=0", "-C", f"mps4_board.subsystem.ethosu.extra_args='{ethos_u_extra_args}'", "-C", f"mps4_board.subsystem.cpu0.semihosting-cmd_line='{cmd_line}'", "-a", elf_path, "--timelimit", f"{self.timeout}", ], } result = _run_cmd(command_args[self.target_board], check=False) if result.returncode != 0: raise RuntimeError( f"Failed to run {command_args[self.target_board]}\nError: {result.stderr.decode()}" ) result_stdout = result.stdout.decode() error_regex = r"(^[EF][: ].*$)|(^.*Hard fault.*$)|(^.*Assertion.*$)" # Check for errors in the output # regex to check for error or fault messages in stdout from FVP if re.compile(error_regex, re.MULTILINE).search(result_stdout): raise RuntimeError( f"Corstone simulation failed:\ncmd: {command_args[self.target_board]}\n, log: \n {result_stdout}\n{result.stderr.decode()}" ) tosa_ref_output = np.fromfile(out_path_with_suffix, dtype=np.float32) output_shape = self.output_node.args[0][0].meta["val"].shape tosa_ref_output = torch.from_numpy(tosa_ref_output).reshape(output_shape) return [tosa_ref_output] def run_tosa_ref_model( self, inputs: Tuple[torch.Tensor], ) -> list[torch.Tensor]: """ Run TOSA reference model using the tosa_reference_model program. In order to do that we need: 1. desc.json, which points to files needed by tosa_reference_model. 2. output.tosa, which is the TOSA buffer that describes the model we're trying to run. These two files are created by arm_backend.py as part of partition stage All these files are saved on disk in self.intermediate_path. Args: inputs (Tuple[torch.Tensor]): The input data to run the TOSA Returns: torch.Tensor: The output of the TOSA reference model, as a torch tensor. Here's a sample desc.json file: { "tosa_file": "output.tosa", "ifm_name": [ "arg0_1" ], "ifm_file": [ "arg0_1.npy" ], "ofm_name": [ "quantized_decomposed_dequantize_per_tensor_default_1" ], "ofm_file": [ "ref-quantized_decomposed_dequantize_per_tensor_default_1.npy" ], "expected_return_code": 0, "expected_failure": false } Todo: * It would be nice to not rely on files on disk. Should be possible as a next step. See: https://review.mlplatform.org/plugins/gitiles/tosa/reference_model/#executable-usage """ assert ( self._has_init_run ), "RunnerUtil needs to be initialized using init_run() before running tosa reference." all_desc_file_paths = [ str(path) for path in Path(self.intermediate_path).glob("desc*.json") ] assert ( all_desc_file_paths ), f"No TOSA description file found in '{self.intermediate_path}'." if len(all_desc_file_paths) != 1: raise NotImplementedError( "Graphs with more than one partition are currently not supported." ) desc_file_path = all_desc_file_paths[0] assert os.path.exists( desc_file_path ), f"desc_file_path: {desc_file_path} does not exist" # Save the input data to disk as a .npy file, since that's what the TOSA # reference model expects. Name of the file must match the name in # desc.json, which is the tensor name from the graph + .npy for input_name, quant_param, data in zip( self.input_names, self.qp_input, inputs, strict=True ): save_npy( self.intermediate_path, data, self.is_quantized, input_name, quant_param ) # Run the TOSA reference model via command line, this will produce a # .npy file with the result (aka OFM). assert ( shutil.which(self.tosa_ref_model_path) is not None ), f"tosa_reference_model tool not found, did you run examples/arm/setup.sh? Path: {self.tosa_ref_model_path}" loglevel_map = { logging.INFO: "INFO", logging.CRITICAL: "LOW", logging.ERROR: "LOW", logging.WARNING: "MED", logging.DEBUG: "HIGH", logging.NOTSET: "MED", } clamped_logging_level = max(min(logger.level // 10 * 10, 50), 0) cmd_ref_model = [ self.tosa_ref_model_path, "--test_desc", desc_file_path, "-l", loglevel_map[clamped_logging_level], ] _run_cmd(cmd_ref_model) # Load desc.json, just to get the name of the output file above with open(desc_file_path) as f: desc_json = json.load(f) tosa_ref_outputs = [] for ofm_file in desc_json["ofm_file"]: ofm_file_npy = os.path.join(self.intermediate_path, ofm_file) # Load the output file (OFM) and return it as a numpy array tosa_ref_output = np.load(ofm_file_npy) if self.is_quantized: # Need to dequant back to FP32 for comparison with torch output # Convert to int32 prior to dequantize the output if tosa_ref_output.dtype == np.int8: tosa_ref_output = tosa_ref_output.astype(np.int32) quant_param = self.qp_output assert ( quant_param is not None ), "There are no quantization parameters, check output parameters" tosa_ref_output = (tosa_ref_output - quant_param.zp) * quant_param.scale if tosa_ref_output.dtype == np.double: tosa_ref_output = tosa_ref_output.astype("float32") # tosa_output is a numpy array, convert to torch tensor for comparison tosa_ref_outputs.append(torch.from_numpy(tosa_ref_output)) return tosa_ref_outputs def prep_data_for_save( data, is_quantized: bool, input_name: str, quant_param: QuantizationParams ): data_np = np.array(data.detach(), order="C").astype( f"{data.dtype}".replace("torch.", "") ) if is_quantized: assert quant_param.node_name in input_name, ( f"The quantization params name '{quant_param.node_name}' does not " f"match the input tensor name '{input_name}'." ) data_np = ( ((data_np / np.float32(quant_param.scale)) + quant_param.zp) .round() .clip(quant_param.qmin, quant_param.qmax) .astype( f"{quant_param.dtype}".replace("torch.", "") ) # Use string format of dtype to convert to numpy dtype ) return data_np def save_npy( path: str, data, is_quantized: bool, input_name: str, quant_param: QuantizationParams, ) -> str: """Serializes and saves 'data' as a .npy file, possibly quantizing it before. Parameters: path: the directory where to save the data. data: the data to save. is_quantized: whether to quantize the data before saving it. input_name: the name of the file, without file-ending. quant_param: the parameters to use for quantization. Returns: the full file path of the output. """ data_np = prep_data_for_save(data, is_quantized, input_name, quant_param) file_path = os.path.join(path, input_name + ".npy") np.save(file_path, data_np, allow_pickle=False) return file_path def save_bytes( path: str, data, is_quantized: bool, input_name: str, quant_param: QuantizationParams, ) -> str: """Serializes and saves 'data' in byte format, possibly quantizing it before. Parameters: path: the directory where to save the data. data: the data to save. is_quantized: whether to quantize the data before saving it. input_name: the name of the file, without file-ending. quant_param: the parameters to use for quantization. Returns: the full file path of the output. """ data_np = prep_data_for_save(data, is_quantized, input_name, quant_param) file_path = os.path.join(path, input_name + ".bin") with open(file_path, "w+b") as f: data_np_bytes = data_np.tobytes() f.write(data_np_bytes) return file_path def _run_cmd(cmd: List[str], check=True) -> subprocess.CompletedProcess[bytes]: """ Run a command and check for errors. Args: cmd (List[str]): The command to run as a list. """ try: result = subprocess.run(cmd, check=check, capture_output=True) return result except subprocess.CalledProcessError as e: arg_string = " ".join(cmd) raise RuntimeError( f"Failed running command {arg_string}\nStderr: {e.stderr.decode()}\nStdout: {e.stdout.decode()}" ) def dbg_tosa_fb_to_json(tosa_fb: bytes) -> Dict: """ This function is used to dump the TOSA flatbuffer to a human readable format, using flatc. It is used for debugging purposes. """ tmp = tempfile.mkdtemp() tosa_input_file = os.path.join(tmp, "output.tosa") with open(tosa_input_file, "wb") as f: f.write(tosa_fb) arm_backend_path = os.path.realpath(os.path.dirname(__file__) + "/..") tosa_schema_file = os.path.join( arm_backend_path, "third-party/serialization_lib/schema/tosa.fbs" ) assert os.path.exists( tosa_schema_file ), f"tosa_schema_file: {tosa_schema_file} does not exist" assert shutil.which("flatc") is not None cmd_flatc = [ "flatc", "--json", "--strict-json", "-o", tmp, "--raw-binary", "-t", tosa_schema_file, "--", tosa_input_file, ] _run_cmd(cmd_flatc) with open(os.path.join(tmp, "output.json"), "r") as f: json_out = json.load(f) # Cast float tensors to proper dtype. try: for region in json_out["regions"]: for block in region["blocks"]: for tensor in block["tensors"]: if "data" in tensor: if tensor["type"] == "FP32": data = np.array(tensor["data"]) data = data.astype(np.int8) data = np.frombuffer(data, dtype=np.float32) data = data.reshape(tensor["shape"]) tensor["data"] = data except Exception: # This is just nice-to-have if it works, don't care if it fails. pass return json_out