# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # pyre-strict import logging import os import selectors import subprocess import sys from dataclasses import dataclass from typing import Dict, List, Optional, Sequence, Tuple, Union import torch from executorch.devtools.bundled_program.config import MethodTestCase, MethodTestSuite from executorch.devtools.bundled_program.core import BundledProgram from executorch.devtools.bundled_program.serialize import ( serialize_from_bundled_program_to_flatbuffer, ) from executorch.exir import ExecutorchProgram, ExecutorchProgramManager # If quiet is true, suppress the printing of stdout and stderr output. quiet = False def _execute_subprocess(cmd: List[str], cwd: Optional[str] = None) -> Tuple[str, str]: """ `subprocess.run(cmd, capture_output=True)` captures stdout/stderr and only returns it at the end. This functions not only does that, but also prints out stdout/stderr non-blockingly when running the command. """ logging.debug(f"cmd = \33[33m{cmd}\33[0m, cwd = {cwd}") stdout = "" stderr = "" PIPE = subprocess.PIPE with subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, cwd=cwd) as p: sel = selectors.DefaultSelector() # pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got # `Optional[IO[bytes]]`. sel.register(p.stdout, selectors.EVENT_READ) # pyre-fixme[6]: For 1st argument expected `Union[HasFileno, int]` but got # `Optional[IO[bytes]]`. sel.register(p.stderr, selectors.EVENT_READ) done = False while not done: for key, _ in sel.select(): # pyre-fixme[16]: Item `HasFileno` of `Union[HasFileno, int]` has no # attribute `read1`. data = key.fileobj.read1().decode() if not data: done = True break if key.fileobj is p.stdout: if not quiet: print(data, end="") stdout += data else: if not quiet: print(data, end="", file=sys.stderr) stderr += data # flush stdout and stderr in case there's no newline character at the end # from the subprocess sys.stdout.flush() sys.stderr.flush() if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, p.args, stdout, stderr) return stdout, stderr def execute(args: List[str]) -> Tuple[str, str]: """ Either a local execution (through subprocess.run) or a remote execution (in Hargow). Run the command described by args (the same way subprocess.run does). Ex: if you want to run "ls -al", you need to pass args = ["ls", "-al"] """ # `import torch` will mess up PYTHONPATH. delete the messed up PYTHONPATH if "PYTHONPATH" in os.environ: del os.environ["PYTHONPATH"] try: return _execute_subprocess(args) except subprocess.CalledProcessError as e: fdb_cmd = f"fdb {' '.join(e.cmd)}" raise RuntimeError( f"Failed to execute. Use the following to debug:\n{fdb_cmd}" ) from e class Executor: # pyre-fixme[3]: Return type must be annotated. def __init__( self, working_dir: str = "", ): self.working_dir = working_dir self.executor_builder = "./backends/cadence/build_cadence_runner.sh" self.execute_runner = "./cmake-out/backends/cadence/cadence_runner" self.bundled_program_path: str = "CadenceDemoModel.bpte" def __call__(self) -> None: # build executor args = self.get_bash_command(self.executor_builder) logging.info(f"\33[33m{' '.join(args)}\33[0m") execute(args) # run executor cmd_args = { "bundled_program_path": os.path.join( self.working_dir, self.bundled_program_path ), "etdump_path": os.path.join(self.working_dir, "etdump.etdp"), "debug_output_path": os.path.join(self.working_dir, "debug_output.bin"), "dump_outputs": "true", } args = self.get_bash_command(self.execute_runner, cmd_args) logging.info(f"\33[33m{' '.join(args)}\33[0m") execute(args) @staticmethod def get_bash_command( executable: str, cmd_args: Optional[Dict[str, str]] = None, ) -> List[str]: # go through buck config and turn the dict into a list of "{key}=={value}" if cmd_args is None: cmd_args = {} cmd_args_strs = [] for key, value in cmd_args.items(): cmd_args_strs.extend([f"--{key}={value}"]) return [executable] + cmd_args_strs @dataclass class BundledProgramTestData: method: str inputs: Sequence[Union[bool, float, int, torch.Tensor]] expected_outputs: Sequence[torch.Tensor] testset_idx: int = 0 # There is only one testset in the bundled program class BundledProgramManager: """ Stateful bundled program object Takes a BundledProgramTestData and generates a bundled program """ def __init__(self, bundled_program_test_data: List[BundledProgramTestData]) -> None: self.bundled_program_test_data: List[BundledProgramTestData] = ( bundled_program_test_data ) @staticmethod # pyre-fixme[2]: Parameter `**args` has no type specified. def bundled_program_test_data_gen(**args) -> BundledProgramTestData: return BundledProgramTestData(**args) def get_method_test_suites(self) -> List[MethodTestSuite]: return [ self._gen_method_test_suite(bptd) for bptd in self.bundled_program_test_data ] def _gen_method_test_suite(self, bptd: BundledProgramTestData) -> MethodTestSuite: method_test_case = MethodTestCase( inputs=bptd.inputs, expected_outputs=bptd.expected_outputs, ) return MethodTestSuite( method_name=bptd.method, test_cases=[method_test_case], ) def _serialize( self, executorch_program: Union[ ExecutorchProgram, ExecutorchProgramManager, ], method_test_suites: Sequence[MethodTestSuite], bptd: BundledProgramTestData, ) -> bytes: bundled_program = BundledProgram( executorch_program=executorch_program, method_test_suites=method_test_suites ) bundled_program_buffer = serialize_from_bundled_program_to_flatbuffer( bundled_program ) return bundled_program_buffer