# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # pyre-unsafe import operator from typing import Callable, Dict, Optional, Set, Union import executorch.backends.vulkan.custom_ops_lib # noqa import torch from executorch.backends.vulkan.serialization.vulkan_graph_schema import ( VkMemoryLayout, VkStorageType, ) from executorch.backends.vulkan.utils import ( all_memory_layouts, all_packed_dims, PackedDim, ) from executorch.exir.dialects._ops import ops as exir_ops from executorch.exir.dialects.edge._ops import EdgeOpOverload from torch._subclasses.fake_tensor import FakeTensor ###################### ## OpFeatures class ## ###################### def allow_node(node: torch.fx.Node) -> bool: return True class TextureImplFeatures: __slots__ = [ "valid_packed_dims", "uses_axis_map", ] def __init__( self, uses_axis_map: bool = False, valid_packed_dims: Optional[Set[PackedDim]] = None, ): self.uses_axis_map: bool = uses_axis_map self.valid_packed_dims = set() if valid_packed_dims is not None: self.valid_packed_dims = valid_packed_dims def valid_memory_layouts(self) -> Set[VkMemoryLayout]: """ Derive the set of memory layouts supported by the texture implementation based on the valid packed dimensions. """ layouts = set() if PackedDim.WIDTH in self.valid_packed_dims: layouts.add(VkMemoryLayout.TENSOR_WIDTH_PACKED) if PackedDim.HEIGHT in self.valid_packed_dims: layouts.add(VkMemoryLayout.TENSOR_HEIGHT_PACKED) if PackedDim.CHANNELS in self.valid_packed_dims: layouts.add(VkMemoryLayout.TENSOR_CHANNELS_PACKED) return layouts class OpFeatures: __slots__ = [ # None or TextureImplFeatures to specify implementation details of the texture # based operator implementation. "texture_impl", # bool indicating if the operator has a buffer based implementation. "buffer_impl", # bool indicating if the operator has a resize function, which allows it to # support dynamic shape tensors. "resize_fn", # Optimal "optimal_storage", "optimal_layout", # bool indicating if the operator handles its own prepacking. If this is True, # then the insert_prepack_nodes pass will not insert prepack nodes for the args # of the op. "handles_own_prepacking", # Optional dictionary to specify a custom function to calculate the required # image extents for a particular argument index. "skip_limits_check", # Optional check function used during partitioning to determine if a node's # inputs are supported by the operator implementation. "check_node_fn", ] def __init__( self, texture_impl: Optional[TextureImplFeatures] = None, buffer_impl: bool = False, resize_fn: bool = False, optimal_storage: Optional[VkStorageType] = None, optimal_layout: Optional[VkMemoryLayout] = None, handles_own_prepacking: bool = False, skip_limits_check: Optional[Set[int]] = None, check_node_fn: Optional[Callable] = None, ): self.texture_impl: Optional[TextureImplFeatures] = texture_impl self.buffer_impl: bool = buffer_impl self.resize_fn: bool = resize_fn self.optimal_storage: Optional[VkStorageType] = optimal_storage self.optimal_layout: Optional[VkMemoryLayout] = optimal_layout self.handles_own_prepacking: bool = handles_own_prepacking self.skip_limits_check: Set[int] = set() if skip_limits_check is not None: self.skip_limits_check = skip_limits_check self.check_node_fn: Callable = allow_node if check_node_fn is not None: self.check_node_fn = check_node_fn def propose_storage_type(self) -> Optional[VkStorageType]: """ Propose a storage type that should be used for this operator. A proposal can be made if one of the following is true: 1. The operator specifies an optimal storage type 2. Only one storage type is supported. If both storage types are supported and no optimal storage type is specified, then None is returned to indicate that there is no preference in storage type. """ if self.optimal_storage is not None: return self.optimal_storage if self.texture_impl is not None and not self.buffer_impl: return VkStorageType.TEXTURE_3D elif self.buffer_impl and self.texture_impl is None: return VkStorageType.BUFFER return None def supported_storage_types(self) -> Set[VkStorageType]: """ Return the set of storage types supported by this operator. """ storage_types = set() if self.texture_impl is not None: storage_types.add(VkStorageType.TEXTURE_3D) if self.buffer_impl: storage_types.add(VkStorageType.BUFFER) return storage_types def propose_memory_layout(self, storage: VkStorageType) -> Optional[VkMemoryLayout]: """ Given a storage type as a precondition, propose a memory layout that should be used for this operator. A proposal can be made if one of the following is true: 1. The operator specifies an optimal memory layout 2. Only one memory layout is supported. If multiple memory layouts are supported and no optimal memory layout is specified then return None to indicate that the "best" memory layout for the operator is ambiguous. """ if self.optimal_layout is not None: return self.optimal_layout if storage == VkStorageType.TEXTURE_3D: assert self.texture_impl is not None possible_layouts = self.texture_impl.valid_memory_layouts() if len(possible_layouts) == 1: return next(iter(possible_layouts)) return None def supported_memory_layouts(self, storage: VkStorageType) -> Set[VkMemoryLayout]: """ Return the set of memory layouts supported by this operator for a given storage type. """ if storage == VkStorageType.TEXTURE_3D: assert self.texture_impl is not None return self.texture_impl.valid_memory_layouts() else: return all_memory_layouts ####################### ## Operator Registry ## ####################### OpKey = Union[str, torch._ops.OpOverload, EdgeOpOverload] vulkan_supported_ops: Dict[OpKey, OpFeatures] = {} def update_features(aten_op): def features_decorator(fn: Callable): def update_features_impl(op: OpKey): if op in vulkan_supported_ops: raise RuntimeError(f"[Vulkan delegate] duplicate registration of {op}!") vulkan_supported_ops[op] = OpFeatures() vulkan_supported_ops[op] = fn(vulkan_supported_ops[op]) if isinstance(aten_op, list): for op in aten_op: update_features_impl(op) else: update_features_impl(aten_op) return fn return features_decorator @update_features( [ operator.getitem, # Quantization related ops will be fused via graph passes exir_ops.edge.quantized_decomposed.quantize_per_channel.default, exir_ops.edge.quantized_decomposed.quantize_per_tensor.default, exir_ops.edge.quantized_decomposed.quantize_per_tensor.tensor, exir_ops.edge.quantized_decomposed.dequantize_per_tensor.default, exir_ops.edge.quantized_decomposed.dequantize_per_tensor.tensor, exir_ops.edge.quantized_decomposed.dequantize_per_channel.default, ] ) def register_ephemeral_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=True, valid_packed_dims=all_packed_dims, ) features.buffer_impl = True features.resize_fn = True return features @update_features( [ exir_ops.edge.aten.add.Tensor, exir_ops.edge.aten.sub.Tensor, exir_ops.edge.aten.minimum.default, exir_ops.edge.aten.mul.Tensor, exir_ops.edge.aten.div.Tensor, exir_ops.edge.aten.div.Tensor_mode, exir_ops.edge.aten.pow.Tensor_Tensor, ] ) def register_binary_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=True, valid_packed_dims=all_packed_dims, ) features.resize_fn = True return features @update_features( [ exir_ops.edge.aten.abs.default, exir_ops.edge.aten.clamp.default, exir_ops.edge.aten.cos.default, exir_ops.edge.aten.exp.default, exir_ops.edge.aten.gelu.default, exir_ops.edge.aten.hardshrink.default, exir_ops.edge.aten.hardtanh.default, exir_ops.edge.aten.neg.default, exir_ops.edge.aten.relu.default, exir_ops.edge.aten.sigmoid.default, exir_ops.edge.aten.sin.default, exir_ops.edge.aten.sqrt.default, exir_ops.edge.aten.rsqrt.default, exir_ops.edge.aten.tanh.default, ] ) def register_unary_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=True, valid_packed_dims=all_packed_dims, ) features.buffer_impl = True features.resize_fn = True return features @update_features(exir_ops.edge.aten._to_copy.default) def register_to_copy_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=True, valid_packed_dims=all_packed_dims, ) features.resize_fn = True def check_to_copy_node(node: torch.fx.Node) -> bool: float_dtypes = [torch.float16, torch.float32] if len(node.args) != 1: return False in_arg = node.args[0] if not isinstance(in_arg, torch.fx.Node): return False in_tensor = in_arg.meta.get("val", None) out_tensor = node.meta.get("val", None) if isinstance(in_tensor, FakeTensor) and isinstance(out_tensor, FakeTensor): if out_tensor.dtype in float_dtypes and in_tensor.dtype in float_dtypes: return True return False features.check_node_fn = check_to_copy_node return features @update_features( [ exir_ops.edge.aten.bmm.default, exir_ops.edge.aten.mm.default, exir_ops.edge.aten.addmm.default, exir_ops.edge.aten.linear.default, ] ) def register_mm_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=True, valid_packed_dims={ PackedDim.WIDTH, PackedDim.CHANNELS, }, ) features.buffer_impl = True features.resize_fn = True features.optimal_storage = VkStorageType.TEXTURE_3D features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED features.handles_own_prepacking = True return features @update_features(exir_ops.edge.aten._weight_int8pack_mm.default) def register_int8_mm_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=False, valid_packed_dims={PackedDim.WIDTH}, ) features.buffer_impl = True features.resize_fn = True features.optimal_storage = VkStorageType.TEXTURE_3D features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED features.handles_own_prepacking = True return features @update_features(exir_ops.edge.et_vk.linear_weight_int4.default) def register_int4_mm_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( uses_axis_map=False, valid_packed_dims={PackedDim.WIDTH}, ) features.resize_fn = True features.optimal_storage = VkStorageType.TEXTURE_3D features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED features.handles_own_prepacking = True return features @update_features( [ exir_ops.edge.aten._log_softmax.default, exir_ops.edge.aten._softmax.default, ] ) def register_softmax_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims=all_packed_dims, ) features.resize_fn = True return features @update_features( [ exir_ops.edge.aten.mean.dim, exir_ops.edge.aten.sum.dim_IntList, exir_ops.edge.aten.amax.default, exir_ops.edge.aten.amin.default, ] ) def register_reduce_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims=all_packed_dims, ) features.resize_fn = True def check_reduce_node(node: torch.fx.Node) -> bool: dim_list = node.args[1] if isinstance(dim_list, list) and len(dim_list) != 1: return False keepdim = node.args[2] if isinstance(keepdim, bool) and not keepdim: return False return True features.check_node_fn = check_reduce_node return features @update_features( [ exir_ops.edge.aten.avg_pool2d.default, exir_ops.edge.aten.max_pool2d_with_indices.default, ] ) def register_2d_pool_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.CHANNELS}, ) features.resize_fn = True return features @update_features( [ exir_ops.edge.aten.convolution.default, exir_ops.edge.et_vk.conv_with_clamp.default, ] ) def register_convolution_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.CHANNELS}, ) features.resize_fn = True features.optimal_storage = VkStorageType.TEXTURE_3D features.optimal_layout = VkMemoryLayout.TENSOR_CHANNELS_PACKED features.handles_own_prepacking = True features.skip_limits_check = {1, 2} return features @update_features("llama::sdpa_with_kv_cache") def register_sdpa_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.WIDTH}, ) features.resize_fn = True features.optimal_storage = VkStorageType.TEXTURE_3D features.optimal_layout = VkMemoryLayout.TENSOR_WIDTH_PACKED features.handles_own_prepacking = True return features @update_features(exir_ops.edge.et_vk.apply_rotary_emb.default) def register_rotary_emb_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.WIDTH}, ) features.resize_fn = True return features @update_features(exir_ops.edge.aten.view_copy.default) def register_view_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims=all_packed_dims, ) features.resize_fn = True return features # Ops ported from PyTorch Vulkan backend. These ops commonly support channels # packed tensors only and do not have a resize function. @update_features( [ # Shape Manipulation exir_ops.edge.aten.squeeze_copy.dims, exir_ops.edge.aten.unsqueeze_copy.default, exir_ops.edge.aten.permute_copy.default, exir_ops.edge.aten.t_copy.default, # Indexing and lookup exir_ops.edge.aten.flip.default, exir_ops.edge.aten.index_select.default, exir_ops.edge.aten.select_copy.int, exir_ops.edge.aten.slice_copy.Tensor, # Tensor combination exir_ops.edge.aten.cat.default, exir_ops.edge.aten.split_with_sizes_copy.default, exir_ops.edge.aten.split.Tensor, exir_ops.edge.aten.repeat.default, # Tensor creation exir_ops.edge.aten.arange.start_step, exir_ops.edge.aten.clone.default, exir_ops.edge.aten.constant_pad_nd.default, exir_ops.edge.aten.full.default, exir_ops.edge.aten.full_like.default, exir_ops.edge.aten.ones.default, exir_ops.edge.aten.ones_like.default, exir_ops.edge.aten.upsample_nearest2d.vec, exir_ops.edge.aten.zeros.default, exir_ops.edge.aten.zeros_like.default, exir_ops.edge.et_vk.grid_priors.default, ] ) def register_ported_op(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.CHANNELS}, ) return features # Ported ops that support their own prepacking. @update_features( [ exir_ops.edge.aten.embedding.default, exir_ops.edge.aten._native_batch_norm_legit_no_training.default, exir_ops.edge.aten.native_layer_norm.default, ] ) def register_ported_ops_with_prepacking(features: OpFeatures): features.texture_impl = TextureImplFeatures( valid_packed_dims={PackedDim.CHANNELS}, ) features.handles_own_prepacking = True return features ####################### ## Utility functions ## ####################### def has_impl(target: OpKey) -> bool: if not isinstance(target, str): if target not in vulkan_supported_ops: return target.name() in vulkan_supported_ops return target in vulkan_supported_ops else: return target in vulkan_supported_ops def get_op_features(target: OpKey) -> OpFeatures: if not isinstance(target, str): if target not in vulkan_supported_ops: # Try the op's name return vulkan_supported_ops[target.name()] return vulkan_supported_ops[target] else: return vulkan_supported_ops[target] def handles_own_prepacking(target: OpKey) -> bool: return get_op_features(target).handles_own_prepacking