# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. import logging import numbers import os import tempfile from typing import Any, Optional, Sequence, Tuple, Union import executorch.exir.schema as et_schema import numpy as np import torch from executorch.backends.cadence.runtime import utils from executorch.backends.cadence.runtime.executor import Executor from executorch.devtools import Inspector from executorch.exir import ExecutorchProgramManager from executorch.exir._serialize._program import deserialize_pte_binary from executorch.exir.schema import DataLocation from numpy import ndarray from torch.utils._pytree import TreeSpec class CadenceETDump: def __init__(self, output_dir: str) -> None: self.tensor_dump_dir: str = os.path.join(output_dir, "tensors") self.etdump_path: str = os.path.join(output_dir, "etdump.etdp") self.etrecord_path: Optional[str] = os.path.join(output_dir, "etrecord.bin") self.debug_buffer_path: Optional[str] = os.path.join( output_dir, "debug_output.bin" ) if not os.path.exists(self.etdump_path): raise RuntimeError(f"{self.etdump_path} does not exist") # pyre-ignore[6]: os.path.exists expects str, but got Optional[str] if not os.path.exists(self.etrecord_path): logging.warning( "ETRecord not found, intermediate tensors will not be dumped" ) self.etrecord_path = None # pyre-ignore[6]: os.path.exists expects str, but got Optional[str] if not os.path.exists(self.debug_buffer_path): logging.warning( "Debug buffer not found, intermediate tensors will not be dumped" ) self.debug_buffer_path = None self.et_inspector: Inspector = Inspector( etdump_path=self.etdump_path, debug_buffer_path=self.debug_buffer_path, etrecord=self.etrecord_path, ) def get_outputs(self, log_to_stdout: bool = False) -> Tuple[torch.Tensor]: output = [ event_block.run_output for event_block in self.et_inspector.event_blocks if event_block.name == "Execute" ] logging.debug(f"[ETdump] output: {output}") return output[0] def print_event_block(self) -> None: logging.debug("[ETdump] data tabular:") if logging.getLogger().level <= logging.DEBUG: self.et_inspector.print_data_tabular() def print_event_data(self) -> None: logging.debug("[ETdump] event data ") for event_block in self.et_inspector.event_blocks: for event in event_block.events: logging.debug(event) def dump_intermediate_tensors(self) -> None: if self.etrecord_path is None: logging.info("[ETdump] Intermediate tensors not available") return logging.info(f"[ETdump] Dumping intermediate tensors to {self.tensor_dump_dir}") os.makedirs(self.tensor_dump_dir, exist_ok=True) exec_blocks = [ eb for eb in self.et_inspector.event_blocks if eb.name == "Execute" ] if len(exec_blocks) > 1: logging.warning( f'Found {len(exec_blocks)} "Execute" blocks, using the first one and ignoring the rest.' ) block = exec_blocks[0] # OPERATOR_CALL events are duplicates that contain framework tax data. We don't need them op_events = [e for e in block.events if e.name != "OPERATOR_CALL"] torch.set_printoptions(profile="full") for event in op_events: instr_id = event._instruction_id if not event.debug_data: logging.debug( f"Missing intermediate tensor data for {event.name} ({instr_id=})" ) continue with open(f"{self.tensor_dump_dir}/{instr_id}.txt", "w") as f: for dd in event.debug_data: f.write(f"{str(dd)}\n\n") torch.set_printoptions(profile="default") def get_op_names(program: et_schema.Program, execution_plan_id: int = 0) -> set[str]: """ Get the list of operators from a Program """ op_names = { f"{op.name}.{op.overload}" for op in program.execution_plan[execution_plan_id].operators } for delegate in program.execution_plan[execution_plan_id].delegates: logging.debug(f"Delegate: {delegate.id}") if delegate.id == "CadenceExecutorchBackend": assert delegate.processed.location == DataLocation.INLINE op_names |= get_op_names( deserialize_pte_binary( program.backend_delegate_data[delegate.processed.index].data ) ) return op_names # Run an ExecutorchProgram using the specified inputs and backend def run( executorch_prog: ExecutorchProgramManager, inputs: Any, ref_outputs: Optional[Sequence[torch.Tensor]] = None, working_dir: Optional[str] = None, ) -> Any: # Get the Program program = executorch_prog.executorch_program out_spec = executorch_prog.exported_program().call_spec.out_spec # Run the program and return the outputs assert isinstance( program, et_schema.Program ), f"program must be Program. Got {type(program)} instead." if working_dir is None: working_dir = tempfile.mkdtemp(dir="/tmp") # initialize e2e Executor with executorch_cfg. executor = Executor(working_dir) # run Executor executor() etdump = CadenceETDump(output_dir=working_dir) outputs = etdump.get_outputs() assert isinstance(out_spec, TreeSpec) outputs = torch.utils._pytree.tree_unflatten(outputs, out_spec) return outputs def compare( # pyre-fixme[2]: Parameter annotation cannot be `Any`. outputs: Any, # pyre-fixme[2]: Parameter annotation cannot be `Any`. ref_outputs: Any, name: str = "", eps_error: float = 1e-1, eps_warn: float = 1e-5, ) -> None: if isinstance(ref_outputs, dict): for k, v in outputs.items(): compare(v, ref_outputs[k], f"{name}/{k}", eps_error, eps_warn) return if isinstance(ref_outputs, (list, tuple)): for i, (output, ref_output) in enumerate(zip(outputs, ref_outputs)): compare(output, ref_output, f"{name}/{i}", eps_error, eps_warn) return assert isinstance(ref_outputs, torch.Tensor), f"Got {type(ref_outputs)} instead." ref_outputs = to_nd_array(ref_outputs) outputs = to_nd_array(outputs) # compare rms = utils.rms(outputs, ref_outputs) norm_rms = utils.normalized_rms(outputs, ref_outputs) max_abs_diff = utils.max_abs_diff(outputs, ref_outputs) max_rel_diff = utils.max_rel_diff(outputs, ref_outputs) stats = ( f"{rms = }, {norm_rms = }, {max_abs_diff = }, {max_rel_diff = :.2f}%, " f"{outputs.shape = }[{outputs.dtype}], {ref_outputs.shape = }[{ref_outputs.dtype}]" ) if np.isnan(rms) or rms > eps_error: logging.error(f"\33[31m[Error]\33[0m Output {name} mismatched! {stats}") logging.error(f"Expected: {ref_outputs}\n") logging.error(f"Got instead: {outputs}\n") raise RuntimeError(f"\33[31m[Error]\33[0m Output {name} mismatched! {stats}") elif rms > eps_warn: logging.warning(f"\33[33m[Warning]\33[0m Output {name} mismatched!. {stats}") else: logging.info(f"\33[32m[Passed]\33[0m Output {name} matched. {stats}") def run_and_compare( executorch_prog: ExecutorchProgramManager, inputs: Any, ref_outputs: Optional[Sequence[torch.Tensor]] = None, working_dir: Optional[str] = None, eps_error: float = 1e-1, eps_warn: float = 1e-5, ) -> Any: outputs = run(executorch_prog, inputs, ref_outputs, working_dir) compare(outputs, ref_outputs, eps_error=eps_error, eps_warn=eps_warn) # pyre-fixme[24]: Generic type `np.ndarray` expects 2 type parameters. def to_nd_array(v: Union[bool, numbers.Number, ndarray, torch.Tensor]) -> np.ndarray: if isinstance(v, np.ndarray): return v if isinstance(v, torch.Tensor): # If v was quantized, we compare its int representation. v = v.int_repr() if v.is_quantized else v return v.cpu().detach().numpy() if isinstance(v, (numbers.Number, bool)): return np.array([v]) raise RuntimeError(f"Unknown type {type(v)}")