# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. import copy import warnings from collections import namedtuple from contextlib import contextmanager from types import MethodType from typing import Any, Callable, cast, List, Optional, Tuple import torch from executorch.exir.capture._config import CaptureConfig from executorch.exir.error import ExportError, ExportErrorType, InternalError from executorch.exir.program import ExirExportedProgram from executorch.exir.program._program import _transform, HackedUpExportedProgramDONOTUSE from executorch.exir.tracer import ( _default_decomposition_table, dispatch_trace, dynamo_trace, flatten_output, Value, ) from executorch.exir.verification.verifier import EXIRATenDialectVerifierBase from torch import _guards from torch._dispatch.python import enable_python_dispatcher from torch._export.passes import ReplaceViewOpsWithViewCopyOpsPass from torch._subclasses.fake_tensor import FakeTensor, FakeTensorMode from torch.export import export from torch.export.exported_program import ( ExportedProgram, ExportGraphSignature, InputKind, InputSpec, ModuleCallEntry, ModuleCallSignature, OutputKind, OutputSpec, TensorArgument, ) from torch.func import functionalize from torch.fx._compatibility import compatibility from torch.fx.experimental.proxy_tensor import make_fx from torch.fx.experimental.symbolic_shapes import ShapeEnv from torch.utils import _pytree as pytree Val = Any CompileSpec = namedtuple( "CompileSpec", ["method_name", "callable", "args", "dynamic_shapes"] ) CallSpec = namedtuple("CallSpec", ["in_spec", "out_spec"]) @compatibility(is_backward_compatible=False) def _capture_legacy_do_not_use(f, args) -> ExirExportedProgram: """ This is a legacy API that should be avoided. Prefer to use capture() instead. """ warnings.warn( "This function is now deprecated, please use `torch.export and exir.to_edge` instead. " "See https://github.com/pytorch/functorch for more details.", DeprecationWarning, stacklevel=1, ) graph_module = dispatch_trace(f, args) flat_args = tuple(pytree.tree_flatten(args)[0]) in_spec, out_spec = graph_module.in_spec, graph_module.out_spec _instantiate_missing_placeholder_val_with_real_inputs(graph_module, flat_args) graph_module._apply(torch.Tensor.contiguous) user_inputs = [ node.name for node in graph_module.graph.nodes if node.op == "placeholder" ] output_node = list(graph_module.graph.nodes)[-1] assert output_node.op == "output" user_outputs = [arg.name for arg in output_node.args[0]] for n in graph_module.graph.nodes: if n.op == "call_function" and "val" not in n.meta: try: args, kwargs = pytree.tree_map_only( torch.fx.Node, lambda x: x.meta["val"], (n.args, n.kwargs) ) n.meta["val"] = n.target(*args, **kwargs) except Exception: n.meta["val"] = None ep = HackedUpExportedProgramDONOTUSE( root=graph_module, graph=graph_module.graph, graph_signature=ExportGraphSignature( input_specs=[ InputSpec( kind=InputKind.USER_INPUT, arg=TensorArgument(name=i), target=None ) for i in user_inputs ], output_specs=[ OutputSpec( kind=OutputKind.USER_OUTPUT, arg=TensorArgument(name=o), target=None ) for o in user_outputs ], ), call_spec=CallSpec(in_spec, out_spec), state_dict={}, range_constraints={}, module_call_graph=[ ModuleCallEntry( fqn="", signature=ModuleCallSignature( inputs=[], outputs=[], # pyre-fixme[6]: For 3rd argument expected `TreeSpec` but got # `Union[Tensor, Module]`. in_spec=in_spec, # pyre-fixme[6]: For 4th argument expected `TreeSpec` but got # `Union[Tensor, Module]`. out_spec=out_spec, ), ) ], example_inputs=None, verifier=EXIRATenDialectVerifierBase, ) return ExirExportedProgram(ep, False) @contextmanager def patch_forward(obj: torch.nn.Module, new_method): """Helper method to make it easier to cleanly torch.export() a method on a module that is not `forward`. TODO(suo): upstream this to torch.export.wrapper. """ # Save the original method original_method = obj.forward # Patch the method obj.forward = new_method.__get__(obj, obj.__class__) try: yield finally: # Restore the original method obj.forward = original_method class WrapperModule(torch.nn.Module): def __init__(self, f): super().__init__() self.forward = f @compatibility(is_backward_compatible=False) def capture( # noqa: C901 f: Callable[..., Any], args: Tuple[Value, ...], config: Optional[CaptureConfig] = None, dynamic_shapes: Optional[List[Any]] = None, ) -> ExirExportedProgram: warnings.warn( "This function is now deprecated, please use `torch.export and exir.to_edge` instead. ", DeprecationWarning, stacklevel=1, ) if not isinstance(args, tuple): raise ExportError( ExportErrorType.INVALID_INPUT_TYPE, f"Expect `args` to be a tuple, got type: {type(args)}.", ) config = config or CaptureConfig() out_spec = None # TODO (zhxchen17) Always functionalize in a second pass no matter which path is taken. flat_args = tuple(pytree.tree_flatten(args)[0]) if not config.enable_aot: if config._unlift: raise ExportError( ExportErrorType.NOT_SUPPORTED, "_unlift config doesn't do anything without enable_aot enabled. Please do not set it", ) if config.pt2_mode: if config.enable_aot: if config.enable_dynamic_shape: raise ExportError( ExportErrorType.NOT_SUPPORTED, "Under enable_aot, enable_dynamic_shapes flag doesn't do anything. Please do not set it", ) if not config.enable_functionalization: raise ExportError( ExportErrorType.NOT_SUPPORTED, "Functionalization is required for enable_aot.", ) # If trying to capture a method and the bound class instance is a # Module, then export the module while patching in that method. if isinstance(f, MethodType) and isinstance(f.__self__, torch.nn.Module): with patch_forward(f.__self__, f): ep = export( cast(torch.nn.Module, f.__self__), args, dynamic_shapes=dynamic_shapes, ) else: mod = f if isinstance(f, torch.nn.Module) else WrapperModule(f) ep = export(mod, args, dynamic_shapes=dynamic_shapes) ep = ep.run_decompositions(_default_decomposition_table()) ep = _transform(ep, ReplaceViewOpsWithViewCopyOpsPass()) if not config._unlift: return ExirExportedProgram(ep, False) graph_module = cast(torch.fx.GraphModule, ep.module()) elif config.enable_dynamic_shape: graph_module, _ = dynamo_trace( f, args, aten_graph=True, tracing_mode="symbolic", dynamo_config=config._dynamo_config, dynamic_shapes=dynamic_shapes, _use_old_decomp_table=config._use_old_decomp_table, ) else: graph_module, _ = dynamo_trace( f, args, aten_graph=True, tracing_mode="fake", dynamo_config=config._dynamo_config, dynamic_shapes=None, _use_old_decomp_table=config._use_old_decomp_table, ) if out_spec is None: if isinstance(graph_module.graph._codegen, torch.fx.graph._PyTreeCodeGen): out_spec = graph_module.graph._codegen.pytree_info.out_spec elif hasattr(graph_module, "_out_spec"): out_spec = graph_module._out_spec else: out_spec = pytree.tree_flatten(f(*args))[1] # NOTE (tmanlaibaatar) # torchdynamo.export adds extra kwarg into the graph module # which is then lost while we are calling make_fx. This is because # make_fx doesn't handle kwargs. Originally we used to use torchdynamo # input spec, but due to some limitations in pytree implementation, it doesn't # recognize the make_fx graph with torchdynamo input spec. We workaround it # by getting the input spec directly from user argument. in_spec = pytree.tree_flatten((args, {}))[1] if config.enable_functionalization and not config.enable_aot: args = copy.deepcopy(args) def graph_with_interpreter(*args): with torch.fx.traceback.preserve_node_meta(): return torch.fx.Interpreter(graph_module).run(*args) functionalized_callable = functionalize( graph_with_interpreter, remove="mutations_and_views", ) assert isinstance(functionalized_callable, Callable) if config.enable_dynamic_shape: fake_tensor_mode = FakeTensorMode( allow_fallback_kernels=False, allow_non_fake_inputs=True, shape_env=ShapeEnv(), ) inps: List[torch.Tensor] = [] for node in graph_module.graph.nodes: if node.op == "placeholder" and "val" in node.meta: example_fake_tensor = node.meta["val"] assert isinstance(example_fake_tensor, FakeTensor) inps.append(example_fake_tensor) if detected_fake_mode := _guards.detect_fake_mode(inps): fake_tensor_mode = detected_fake_mode count = 0 def convert_to_fake(x): nonlocal count val = inps[count] count += 1 return val fake_args = pytree.tree_map_only(torch.Tensor, convert_to_fake, args) with enable_python_dispatcher(), fake_tensor_mode: graph_module = make_fx( functionalized_callable, tracing_mode="real", _allow_non_fake_inputs=True, )(*fake_args) else: # To avoid breaking folks, use the deprecated "real" tracing # mode if we're not using pt2. tracing_mode = "fake" if config.pt2_mode else "real" graph_module = make_fx( functionalized_callable, tracing_mode=tracing_mode, _allow_non_fake_inputs=True, )(*args) flatten_output(graph_module) else: raise InternalError("pt2=False path is officially deprecated") _instantiate_missing_placeholder_val_with_real_inputs(graph_module, flat_args) graph_module._apply(torch.Tensor.contiguous) user_inputs = [ InputSpec( kind=InputKind.USER_INPUT, arg=TensorArgument(name=node.name), target=None ) for node in graph_module.graph.nodes if node.op == "placeholder" ] output_node = list(graph_module.graph.nodes)[-1] assert output_node.op == "output" user_outputs = [ OutputSpec( kind=OutputKind.USER_OUTPUT, arg=TensorArgument(name=arg.name), target=None ) for arg in output_node.args[0] ] graph_module.graph.eliminate_dead_code() ep = ExportedProgram( root=graph_module, graph=graph_module.graph, graph_signature=ExportGraphSignature(user_inputs, user_outputs), state_dict={}, range_constraints={}, module_call_graph=[ ModuleCallEntry( fqn="", signature=ModuleCallSignature( inputs=[], outputs=[], in_spec=in_spec, # pyre-fixme[6]: For 4th argument expected `TreeSpec` but got # `Union[None, TreeSpec, Tensor, Module]`. out_spec=out_spec, ), ) ], example_inputs=None, verifiers=[EXIRATenDialectVerifierBase], ) return ExirExportedProgram(ep, False) # This is to bootstrap the missing meta["val"] when 1. ph consists of scalar # 2. meta["val"] is not properly set in dispatch_trace. def _instantiate_missing_placeholder_val_with_real_inputs(gm, args): phs = [node for node in gm.graph.nodes if node.op == "placeholder"] if len(phs) != len(args): raise ExportError( ExportErrorType.NOT_SUPPORTED, "Expect number of placeholders to be the same as user inputs.", ) for node, arg in zip(phs, args): if "val" not in node.meta or node.meta["val"] is None: node.meta["val"] = arg