1# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Benchmarks to compare effect of different paramemter values on the performance."""
16import time
17import numpy as np
18
19from tensorflow.python.data.benchmarks import benchmark_base
20from tensorflow.python.data.experimental.ops import testing
21from tensorflow.python.data.ops import dataset_ops
22from tensorflow.python.ops import math_ops
23from tensorflow.python.ops import script_ops
24
25# The maximum sleeping time for each output step and the input sleeping time in
26# milliseconds.
27max_output_sleep_ms = 0.5
28input_sleep_ms = 1.5
29
30
31def sleep_function(x):
32  time.sleep(np.random.uniform(max_output_sleep_ms) / 1000)
33  return x
34
35
36def map_function(x):
37  return script_ops.py_func(sleep_function, [x], x.dtype)
38
39
40class ParameterValueBenchmark(benchmark_base.DatasetBenchmarkBase):
41  """Benchmarks to compare effect of different paramemter values on the performance."""
42
43  def _benchmark_map(self, num_parallel_calls, buffer_size):
44    k = 1024 * 1024
45    dataset = dataset_ops.Dataset.from_tensors(
46        (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat()
47    dataset = dataset.map(
48        math_ops.matmul, num_parallel_calls=num_parallel_calls)
49    dataset = dataset.map(map_function)
50    dataset = dataset.prefetch(buffer_size=buffer_size)
51    dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000)))
52
53    name_str = ("map_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f_"
54                "num_parallel_calls_%d_buffer_size_%d")
55    return self.run_and_report_benchmark(
56        dataset=dataset,
57        num_elements=10000,
58        name=name_str %
59        (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size))
60
61  def benchmark_map(self):
62    nums_parallel_calls = [4, 8, 12]
63    buffer_sizes = [10, 50, 100, 150, 200, 250, 300]
64
65    parameters_list = []
66    wall_time_map = {}
67
68    for num_parallel_calls in nums_parallel_calls:
69      for buffer_size in buffer_sizes:
70        parameters = (num_parallel_calls, buffer_size)
71        parameters_list.append(parameters)
72        wall_time = self._benchmark_map(num_parallel_calls, buffer_size)
73        wall_time_map[parameters] = wall_time
74
75    parameters_list.sort(key=lambda x: wall_time_map[x])
76    for parameters in parameters_list:
77      print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters,
78            wall_time_map[parameters])
79
80  def _benchmark_map_and_batch(self, num_parallel_calls, buffer_size):
81    batch_size = 16
82    k = 1024 * 1024
83
84    dataset = dataset_ops.Dataset.from_tensors(
85        (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat()
86    dataset = dataset.map(
87        math_ops.matmul, num_parallel_calls=num_parallel_calls)
88    dataset = dataset.batch(batch_size=batch_size)
89    dataset = dataset.map(map_function)
90    dataset = dataset.prefetch(buffer_size=buffer_size)
91    dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000)))
92
93    name_str = ("map_and_batch_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f"
94                "_num_parallel_calls_%d_buffer_size_%d")
95    return self.run_and_report_benchmark(
96        dataset=dataset,
97        num_elements=1000,
98        name=name_str %
99        (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size))
100
101  def benchmark_map_and_batch(self):
102    nums_parallel_calls = [4, 8, 12]
103    buffer_sizes = [10, 50, 100, 150, 200, 250, 300]
104
105    parameters_list = []
106    wall_time_map = {}
107
108    for num_parallel_calls in nums_parallel_calls:
109      for buffer_size in buffer_sizes:
110        parameters = (num_parallel_calls, buffer_size)
111        parameters_list.append(parameters)
112        wall_time = self._benchmark_map_and_batch(num_parallel_calls,
113                                                  buffer_size)
114        wall_time_map[parameters] = wall_time
115
116    parameters_list.sort(key=lambda x: wall_time_map[x])
117    for parameters in parameters_list:
118      print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters,
119            wall_time_map[parameters])
120
121  def _benchmark_interleave(self, num_parallel_calls, buffer_size):
122    k = 1024 * 1024
123    dataset = dataset_ops.Dataset.from_tensors(
124        (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat()
125    dataset = dataset.map(math_ops.matmul)
126    dataset = dataset.map(map_function)
127    dataset = dataset_ops.Dataset.range(1).repeat().interleave(
128        lambda _: dataset,  # pylint: disable=cell-var-from-loop
129        cycle_length=10,
130        num_parallel_calls=num_parallel_calls)
131    dataset = dataset.prefetch(buffer_size=buffer_size)
132    dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000)))
133
134    name_str = ("interleave_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f"
135                "_num_parallel_calls_%d_buffer_size_%d")
136    return self.run_and_report_benchmark(
137        dataset=dataset,
138        num_elements=10000,
139        name=name_str %
140        (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size))
141
142  def benchmark_interleave(self):
143    nums_parallel_calls = [4, 8, 10]
144    buffer_sizes = [10, 50, 100, 150, 200, 250, 300]
145
146    parameters_list = []
147    wall_time_map = {}
148
149    for num_parallel_calls in nums_parallel_calls:
150      for buffer_size in buffer_sizes:
151        parameters = (num_parallel_calls, buffer_size)
152        parameters_list.append(parameters)
153        wall_time = self._benchmark_interleave(num_parallel_calls, buffer_size)
154        wall_time_map[parameters] = wall_time
155
156    parameters_list.sort(key=lambda x: wall_time_map[x])
157    for parameters in parameters_list:
158      print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters,
159            wall_time_map[parameters])
160
161
162if __name__ == "__main__":
163  benchmark_base.test.main()
164