xref: /aosp_15_r20/external/pytorch/benchmarks/tensorexpr/benchmark.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1import contextlib
2import json
3import os
4import time
5
6import numpy as np
7
8import torch
9
10from . import tensor_engine
11
12
13class Benchmark:
14    def __init__(self, mode, device, dtype):
15        self.mode = mode
16        self.deterministic = False
17        self.device = device
18        self.dtype = dtype
19        self.output_type = "stdout"
20        self.print_ir = False
21        self.print_kernel = False
22        if mode == "both":
23            self.requires_grad = True
24        elif mode == "fwd":
25            self.requires_grad = False
26        else:
27            raise ValueError(f"invalid mode: {mode}")
28        self.result_grad = None
29        self.grad_variables = []
30        self.engine = tensor_engine.get_engine()
31        self.engine.reset(device)
32
33        # forward all member functions in self.engine to self
34        for method in dir(self.engine):
35            if not callable(getattr(self.engine, method)):
36                continue
37            # don't forward if this function is overriden here
38            if hasattr(self, method):
39                continue
40            # don't forward if it is a internal function
41            if method.startswith("_"):
42                continue
43            method_engine = getattr(self.engine, method)
44            setattr(self, method, method_engine)
45
46    def forward(self):
47        """do one step worth of computation"""
48        raise ValueError("this method should be reimplemented by subclass")
49
50    def check(self):
51        if not self.deterministic:
52            return
53        np.testing.assert_allclose(
54            self.reference(), self.numpy(self.compute()), atol=1e-2
55        )
56
57    def config(self):
58        """returns an array for the current benchmark configs"""
59        raise ValueError("this method should be reimplemented by subclass")
60
61    def desc(self):
62        """return the description of the current benchmark"""
63        config = self.config()
64        config_str = "_".join([str(x) for x in config])
65        device = self.device
66        if "NNC_NUM_THREADS" in os.environ:
67            num_threads_str = os.environ["NNC_NUM_THREADS"]
68            device += num_threads_str
69        return f"{self.engine.mode}: {self.module()}_{self.mode}_{device}_{config_str}"
70
71    @staticmethod
72    def module():
73        raise ValueError("this method should be reimplemented by subclass")
74
75    def memory_workload(self):
76        raise ValueError("this method should be reimplemented by subclass")
77
78    def compute_workload(self):
79        """return the number of scalar operations it takes to finish the tensor op"""
80        return None
81
82    @staticmethod
83    def input_iterable():
84        """A benchmark child class should return true if it utilizes the input iter arg"""
85        return False
86
87    def dtype_to_bytes(self):
88        return torch.tensor(0, dtype=self.dtype).element_size()
89
90    @staticmethod
91    def default_configs():
92        """return a list of defualt configs for this benchmark"""
93        raise ValueError("this method should be reimplemented by subclass")
94
95    def is_supported(self):
96        return True
97
98    def rand(self, shape, device=None, dtype=None, requires_grad=False):
99        v = self.engine.rand(
100            shape, device=device, dtype=dtype, requires_grad=requires_grad
101        )
102        if requires_grad:
103            self.grad_variables.append(v)
104        return v
105
106    def nchw_rand(self, shape, device=None, requires_grad=False):
107        v = self.engine.nchw_rand(shape, device=device, requires_grad=requires_grad)
108        if requires_grad:
109            self.grad_variables.append(v)
110        return v
111
112    def compute(self):
113        if self.bm_jit:
114            return self.bm_jit(*self.inputs)
115        else:
116            return self.forward(*self.inputs)
117
118    def run(self, args):
119        self.print_ir = args.print_ir
120        if args.cuda_fuser == "old":
121            torch._C._jit_override_can_fuse_on_gpu(True)
122            if args.print_kernel:
123                os.environ["PYTORCH_FUSION_DEBUG"] = "1"
124            return self.run_impl(True)
125        elif args.cuda_fuser == "te":
126            torch._C._jit_set_texpr_fuser_enabled(True)
127            with cuda_pointwise_context(
128                args.cuda_pointwise_loop_levels,
129                args.cuda_pointwise_block_count,
130                args.cuda_pointwise_block_size,
131            ):
132                return self.run_impl(True)
133        elif args.cuda_fuser == "nvf":
134            torch._C._jit_set_nvfuser_enabled(True)
135            torch._C._jit_set_profiling_executor(True)
136            torch._C._jit_set_profiling_mode(True)
137            torch._C._jit_override_can_fuse_on_cpu(False)
138            torch._C._jit_override_can_fuse_on_gpu(False)
139            torch._C._jit_set_bailout_depth(20)
140            if args.print_kernel:
141                os.environ["PYTORCH_CUDA_FUSER_DEBUG"] = "1"
142            return self.run_impl(True)
143        else:
144            return self.run_impl(False)
145
146    def run_impl(self, use_fuser):
147        warmups = 10
148        if self.device == "cuda":
149            iters = 1000
150        else:
151            iters = 10
152        engine = tensor_engine.get_engine()
153
154        self.bm_jit = None
155        for i in range(warmups + iters):
156            if i == warmups:
157                if self.device == "cuda":
158                    engine.sync_cuda()
159                time_start = time.time()
160
161            if i == 0:
162                if self.jit_mode == "trace" and use_fuser:
163                    self.bm_jit = torch.jit.trace(
164                        self.forward, example_inputs=self.inputs, check_trace=False
165                    )
166                if callable(getattr(self, "reference", None)):
167                    self.check()
168                else:
169                    print("Warning: no reference result for ", self.module())
170            elif i == 1:
171                # The fusion graph is visible after the first iter is executed
172                if self.jit_mode == "trace" and use_fuser and self.print_ir:
173                    print(self.bm_jit.graph_for(*self.inputs))
174            z = self.compute()
175            if self.mode == "both":
176                if self.result_grad is None:
177                    self.result_grad = engine.rand_like(z)
178                engine.backward([z], [self.result_grad], self.grad_variables)
179
180        if self.device == "cuda":
181            engine.sync_cuda()
182
183        duration = time.time() - time_start
184        iter_time = duration / iters
185        memory_workload = self.memory_workload()
186        compute_workload = self.compute_workload()
187
188        result_dict = {
189            "desc": self.desc(),
190            "us": iter_time * 1e6,
191            "sol": memory_workload["sol"] * self.dtype_to_bytes() / iter_time / 1e9,
192            "algorithmic": memory_workload["algorithmic"]
193            * self.dtype_to_bytes()
194            / iter_time
195            / 1e9,
196        }
197        if compute_workload:
198            result_dict["compute_workload"] = compute_workload / iter_time / 1e9
199        self.dump_result(result_dict)
200
201    def dump_result(self, result_dict):
202        if self.output_type == "json":
203            print(json.dumps(result_dict))
204        elif self.output_type == "stdout":
205            msg = "{}: {:.2f} us, SOL {:.2f} GB/s, algorithmic {:.2f} GB/s".format(
206                result_dict["desc"],
207                result_dict["us"],
208                result_dict["sol"],
209                result_dict["algorithmic"],
210            )
211            if "compute_workload" in result_dict:
212                msg += f", compute {result_dict['compute_workload']:.2f} Gops/s"
213            print(msg)
214        else:
215            raise Exception("Unknown output_type " + self.output_type)  # noqa: TRY002
216
217
218@contextlib.contextmanager
219def cuda_pointwise_context(loop_levels, block_count, block_size):
220    if loop_levels:
221        old_loop_levels = torch._C._jit_get_te_cuda_pointwise_loop_levels()
222        torch._C._jit_set_te_cuda_pointwise_loop_levels(loop_levels)
223    if block_count:
224        old_block_count = torch._C._jit_get_te_cuda_pointwise_block_count()
225        torch._C._jit_set_te_cuda_pointwise_block_count(block_count)
226    if block_size:
227        old_block_size = torch._C._jit_get_te_cuda_pointwise_block_size()
228        torch._C._jit_set_te_cuda_pointwise_block_size(block_size)
229
230    try:
231        yield
232    finally:
233        if loop_levels:
234            torch._C._jit_set_te_cuda_pointwise_loop_levels(old_loop_levels)
235        if block_count:
236            torch._C._jit_set_te_cuda_pointwise_block_count(old_block_count)
237        if block_size:
238            torch._C._jit_set_te_cuda_pointwise_block_size(old_block_size)
239
240
241# Auxiliary class to facilitate dynamic input shape
242class DynamicShape:
243    r"""
244    An Auxiliary class for dynamic shape benchmarks
245
246    Pre-computes input with random shapes and also
247    modifies the compute method so in each call the
248    fuser sees a different input tensor shape
249    """
250
251    # Number of random inputs in an instance
252    SAMPLE_SIZE = 100
253
254    def __init__(self, dynamic_range=1.2):
255        self._input_samples = []
256        self._input_sample_index = 0
257        self._dynamic_range = (
258            1.0 / dynamic_range if dynamic_range > 1.0 else dynamic_range
259        )
260        self._enable_dynamic_shapes = True
261
262    # Returns the input test case that current index points to
263    @property
264    def inputs(self):
265        return self._input_samples[self._input_sample_index]
266
267    # An inputs assignment actually adds a test case in the class buffer
268    @inputs.setter
269    def inputs(self, val):
270        self._input_samples.append(val)
271
272    # Runs normal compute while increment test case index
273    def compute(self):
274        super().compute()
275        self._input_sample_index = (self._input_sample_index + 1) % self.SAMPLE_SIZE
276
277    # Defined by benchmark, the benchmark needs to specify the input
278    # tensor construction in this method, essentially the same way
279    # a benchmark creates the inputs list in the initializer
280    def instantiate_input(self):
281        raise NotImplementedError
282
283    # Instantiate random shaped inputs and start the benchmark run
284    def run(self, args):
285        # force disable dynamic shape from command line
286        if args.no_dynamic_shape:
287            self._enable_dynamic_shapes = False
288        self.load_inputs()
289        super().run(args)
290
291    # pre-compute inputs so the creations of random tensors
292    # do not add to the compute time
293    def load_inputs(self):
294        for i in range(self.SAMPLE_SIZE - 1):
295            self.instantiate_input()
296
297    # returns a randomized shape
298    def rand_shape(self, shape):
299        if not self._enable_dynamic_shapes:
300            return shape
301        ratios = np.random.uniform(self._dynamic_range, 1.0, len(shape))
302        dyn_shape = list(np.multiply(shape, ratios).astype(int))
303        return dyn_shape
304
305
306benchmark_classes = []
307
308
309def register_benchmark_class(benchmark_cls):
310    benchmark_classes.append(benchmark_cls)
311