# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # Copyright 2023-2024 Arm Limited and/or its affiliates. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # Example script for exporting simple models to flatbuffer import argparse import json import logging import os from pathlib import Path from typing import Any, Dict, Optional, Tuple import torch from executorch.backends.arm.arm_backend import ArmCompileSpecBuilder from executorch.backends.arm.arm_partitioner import ArmPartitioner from executorch.backends.arm.quantizer.arm_quantizer import ( ArmQuantizer, get_symmetric_quantization_config, ) from executorch.backends.arm.util.arm_model_evaluator import ( GenericModelEvaluator, MobileNetV2Evaluator, ) from executorch.devtools.backend_debug import get_delegation_info from executorch.exir import ( EdgeCompileConfig, ExecutorchBackendConfig, to_edge_transform_and_lower, ) from executorch.extension.export_util.utils import save_pte_program from tabulate import tabulate # Quantize model if required using the standard export quantizaion flow. from torch.ao.quantization.quantize_pt2e import convert_pt2e, prepare_pt2e from torch.utils.data import DataLoader from ..models import MODEL_NAME_TO_MODEL from ..models.model_factory import EagerModelFactory FORMAT = "[%(levelname)s %(asctime)s %(filename)s:%(lineno)s] %(message)s" logging.basicConfig(level=logging.WARNING, format=FORMAT) def get_model_and_inputs_from_name(model_name: str) -> Tuple[torch.nn.Module, Any]: """Given the name of an example pytorch model, return it and example inputs. Raises RuntimeError if there is no example model corresponding to the given name. """ # Case 1: Model is defined in this file if model_name in models.keys(): model = models[model_name]() example_inputs = models[model_name].example_input # Case 2: Model is defined in examples/models/ elif model_name in MODEL_NAME_TO_MODEL.keys(): logging.warning( "Using a model from examples/models not all of these are currently supported" ) model, example_inputs, _, _ = EagerModelFactory.create_model( *MODEL_NAME_TO_MODEL[model_name] ) # Case 3: Model is in an external python file loaded as a module. # ModelUnderTest should be a torch.nn.module instance # ModelInputs should be a tuple of inputs to the forward function elif model_name.endswith(".py"): import importlib.util # load model's module and add it spec = importlib.util.spec_from_file_location("tmp_model", model_name) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) model = module.ModelUnderTest example_inputs = module.ModelInputs else: raise RuntimeError( f"Model '{model_name}' is not a valid name. Use --help for a list of available models." ) return model, example_inputs def quantize( model: torch.nn.Module, model_name: str, example_inputs: Tuple[torch.Tensor], evaluator_name: str | None, evaluator_config: Dict[str, Any] | None, ) -> torch.nn.Module: """This is the official recommended flow for quantization in pytorch 2.0 export""" logging.info("Quantizing Model...") logging.debug(f"Original model: {model}") quantizer = ArmQuantizer() # if we set is_per_channel to True, we also need to add out_variant of quantize_per_channel/dequantize_per_channel operator_config = get_symmetric_quantization_config(is_per_channel=False) quantizer.set_global(operator_config) m = prepare_pt2e(model, quantizer) dataset = get_calibration_data( model_name, example_inputs, evaluator_name, evaluator_config ) # The dataset could be a tuple of tensors or a DataLoader # These two cases need to be accounted for if isinstance(dataset, DataLoader): for sample, _ in dataset: m(sample) else: m(*dataset) m = convert_pt2e(m) logging.debug(f"Quantized model: {m}") return m # Simple example models class AddModule(torch.nn.Module): def __init__(self): super().__init__() def forward(self, x): return x + x example_input = (torch.ones(5, dtype=torch.int32),) can_delegate = True class AddModule2(torch.nn.Module): def __init__(self): super().__init__() def forward(self, x, y): return x + y example_input = ( torch.ones(5, dtype=torch.int32), torch.ones(5, dtype=torch.int32), ) can_delegate = True class AddModule3(torch.nn.Module): def __init__(self): super().__init__() def forward(self, x, y): return (x + y, x + x) example_input = ( torch.ones(5, dtype=torch.int32), torch.ones(5, dtype=torch.int32), ) can_delegate = True class SoftmaxModule(torch.nn.Module): def __init__(self): super().__init__() self.softmax = torch.nn.Softmax(dim=0) def forward(self, x): z = self.softmax(x) return z example_input = (torch.ones(2, 2),) can_delegate = False models = { "add": AddModule, "add2": AddModule2, "add3": AddModule3, "softmax": SoftmaxModule, } calibration_data = { "add": (torch.randn(1, 5),), "add2": ( torch.randn(1, 5), torch.randn(1, 5), ), "add3": ( torch.randn(32, 5), torch.randn(32, 5), ), "softmax": (torch.randn(32, 2, 2),), } evaluators = { "generic": GenericModelEvaluator, "mv2": MobileNetV2Evaluator, } targets = [ "ethos-u55-32", "ethos-u55-64", "ethos-u55-128", "ethos-u55-256", "ethos-u85-128", "ethos-u85-256", "ethos-u85-512", "ethos-u85-1024", "ethos-u85-2048", "TOSA", ] def get_calibration_data( model_name: str, example_inputs: Tuple[torch.Tensor], evaluator_name: str | None, evaluator_config: str | None, ): # Firstly, if the model is being evaluated, take the evaluators calibration function if it has one if evaluator_name is not None: evaluator = evaluators[evaluator_name] if hasattr(evaluator, "get_calibrator"): assert evaluator_config is not None config_path = Path(evaluator_config) with config_path.open() as f: config = json.load(f) if evaluator_name == "mv2": return evaluator.get_calibrator( training_dataset_path=config["training_dataset_path"] ) else: raise RuntimeError(f"Unknown evaluator: {evaluator_name}") # If the model is in the calibration_data dictionary, get the data from there # This is used for the simple model examples provided if model_name in calibration_data: return calibration_data[model_name] # As a last resort, fallback to the scripts previous behavior and return the example inputs return example_inputs def get_compile_spec( target: str, intermediates: Optional[str] = None ) -> ArmCompileSpecBuilder: spec_builder = None if target == "TOSA": spec_builder = ( ArmCompileSpecBuilder() .tosa_compile_spec("TOSA-0.80.0+BI") .set_permute_memory_format(True) ) elif "ethos-u55" in target: spec_builder = ( ArmCompileSpecBuilder() .ethosu_compile_spec( target, system_config="Ethos_U55_High_End_Embedded", memory_mode="Shared_Sram", extra_flags="--debug-force-regor --output-format=raw", ) .set_permute_memory_format(True) .set_quantize_io(True) ) elif "ethos-u85" in target: spec_builder = ( ArmCompileSpecBuilder() .ethosu_compile_spec( target, system_config="Ethos_U85_SYS_DRAM_Mid", memory_mode="Shared_Sram", extra_flags="--output-format=raw", ) .set_permute_memory_format(True) .set_quantize_io(True) ) if intermediates is not None: spec_builder.dump_intermediate_artifacts_to(intermediates) return spec_builder.build() def evaluate_model( model_name: str, intermediates: str, model_fp32: torch.nn.Module, model_int8: torch.nn.Module, example_inputs: Tuple[torch.Tensor], evaluator_name: str, evaluator_config: str | None, ) -> None: evaluator = evaluators[evaluator_name] # Get the path of the TOSA flatbuffer that is dumped intermediates_path = Path(intermediates) tosa_paths = list(intermediates_path.glob("*.tosa")) if evaluator.REQUIRES_CONFIG: assert evaluator_config is not None config_path = Path(evaluator_config) with config_path.open() as f: config = json.load(f) if evaluator_name == "mv2": init_evaluator = evaluator( model_name, model_fp32, model_int8, example_inputs, str(tosa_paths[0]), config["batch_size"], config["validation_dataset_path"], ) else: raise RuntimeError(f"Unknown evaluator {evaluator_name}") else: init_evaluator = evaluator( model_name, model_fp32, model_int8, example_inputs, str(tosa_paths[0]) ) quant_metrics = init_evaluator.evaluate() output_json_path = intermediates_path / "quant_metrics.json" with output_json_path.open("w") as json_file: json.dump(quant_metrics, json_file) def dump_delegation_info(edge, intermediate_files_folder: Optional[str] = None): graph_module = edge.exported_program().graph_module delegation_info = get_delegation_info(graph_module) df = delegation_info.get_operator_delegation_dataframe() table = tabulate(df, headers="keys", tablefmt="fancy_grid") delegation_info_string = f"Delegation info:\n{delegation_info.get_summary()}\nDelegation table:\n{table}\n" logging.info(delegation_info_string) if intermediate_files_folder is not None: delegation_file_path = os.path.join( intermediate_files_folder, "delegation_info.txt" ) with open(delegation_file_path, "w") as file: file.write(delegation_info_string) def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "-m", "--model_name", required=True, help=f"Provide model name. Valid ones: {set(list(models.keys())+list(MODEL_NAME_TO_MODEL.keys()))}", ) parser.add_argument( "-d", "--delegate", action="store_true", required=False, default=False, help="Flag for producing ArmBackend delegated model", ) parser.add_argument( "-t", "--target", action="store", required=False, default="ethos-u55-128", choices=targets, help=f"For ArmBackend delegated models, pick the target, and therefore the instruction set generated. valid targets are {targets}", ) parser.add_argument( "-e", "--evaluate", required=False, nargs="?", const="generic", choices=["generic", "mv2"], help="Flag for running evaluation of the model.", ) parser.add_argument( "-c", "--evaluate_config", required=False, default=None, help="Provide path to evaluator config, if it is required.", ) parser.add_argument( "-q", "--quantize", action="store_true", required=False, default=False, help="Produce a quantized model", ) parser.add_argument( "-s", "--so_library", required=False, default=None, help="Provide path to so library. E.g., cmake-out/examples/portable/custom_ops/libcustom_ops_aot_lib.so", ) parser.add_argument( "--debug", action="store_true", help="Set the logging level to debug." ) parser.add_argument( "-i", "--intermediates", action="store", required=False, help="Store intermediate output (like TOSA artefacts) somewhere.", ) parser.add_argument( "-o", "--output", action="store", required=False, help="Location for outputs, if not the default of cwd.", ) args = parser.parse_args() if args.evaluate and ( args.quantize is None or args.intermediates is None or (not args.delegate) ): raise RuntimeError( "--evaluate requires --quantize, --intermediates and --delegate to be enabled." ) if args.debug: logging.basicConfig(level=logging.DEBUG, format=FORMAT, force=True) if args.quantize and not args.so_library: logging.warning( "Quantization enabled without supplying path to libcustom_ops_aot_lib using -s flag." + "This is required for running quantized models with unquantized input." ) # if we have custom ops, register them before processing the model if args.so_library is not None: logging.info(f"Loading custom ops from {args.so_library}") torch.ops.load_library(args.so_library) if ( args.model_name in models.keys() and args.delegate is True and models[args.model_name].can_delegate is False ): raise RuntimeError(f"Model {args.model_name} cannot be delegated.") return args if __name__ == "__main__": args = get_args() # Pick model from one of the supported lists model, example_inputs = get_model_and_inputs_from_name(args.model_name) model = model.eval() # export_for_training under the assumption we quantize, the exported form also works # in to_edge if we don't quantize exported_program = torch.export.export_for_training(model, example_inputs) model = exported_program.module() model_fp32 = model # Quantize if required model_int8 = None if args.quantize: model = quantize( model, args.model_name, example_inputs, args.evaluate, args.evaluate_config ) model_int8 = model # Wrap quantized model back into an exported_program exported_program = torch.export.export_for_training(model, example_inputs) if args.intermediates: os.makedirs(args.intermediates, exist_ok=True) if args.delegate: # As we can target multiple output encodings from ArmBackend, one must # be specified. compile_spec = get_compile_spec(args.target, args.intermediates) edge = to_edge_transform_and_lower( exported_program, partitioner=[ArmPartitioner(compile_spec)], compile_config=EdgeCompileConfig( _check_ir_validity=False, _skip_dim_order=True, ), ) else: edge = to_edge_transform_and_lower( exported_program, compile_config=EdgeCompileConfig( _check_ir_validity=False, _skip_dim_order=True, ), ) dump_delegation_info(edge, args.intermediates) try: exec_prog = edge.to_executorch( config=ExecutorchBackendConfig(extract_delegate_segments=False) ) except RuntimeError as e: if "Missing out variants" in str(e.args[0]): raise RuntimeError( e.args[0] + ".\nThis likely due to an external so library not being loaded. Supply a path to it with the -s flag." ).with_traceback(e.__traceback__) from None else: raise e model_name = os.path.basename(os.path.splitext(args.model_name)[0]) output_name = f"{model_name}" + ( f"_arm_delegate_{args.target}" if args.delegate is True else f"_arm_{args.target}" ) if args.output is not None: output_name = os.path.join(args.output, output_name) save_pte_program(exec_prog, output_name) if args.evaluate: evaluate_model( args.model_name, args.intermediates, model_fp32, model_int8, example_inputs, args.evaluate, args.evaluate_config, )