1# Copyright 2021 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Benchmarks to compare effect of different paramemter values on the performance.""" 16import time 17import numpy as np 18 19from tensorflow.python.data.benchmarks import benchmark_base 20from tensorflow.python.data.experimental.ops import testing 21from tensorflow.python.data.ops import dataset_ops 22from tensorflow.python.ops import math_ops 23from tensorflow.python.ops import script_ops 24 25# The maximum sleeping time for each output step and the input sleeping time in 26# milliseconds. 27max_output_sleep_ms = 0.5 28input_sleep_ms = 1.5 29 30 31def sleep_function(x): 32 time.sleep(np.random.uniform(max_output_sleep_ms) / 1000) 33 return x 34 35 36def map_function(x): 37 return script_ops.py_func(sleep_function, [x], x.dtype) 38 39 40class ParameterValueBenchmark(benchmark_base.DatasetBenchmarkBase): 41 """Benchmarks to compare effect of different paramemter values on the performance.""" 42 43 def _benchmark_map(self, num_parallel_calls, buffer_size): 44 k = 1024 * 1024 45 dataset = dataset_ops.Dataset.from_tensors( 46 (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat() 47 dataset = dataset.map( 48 math_ops.matmul, num_parallel_calls=num_parallel_calls) 49 dataset = dataset.map(map_function) 50 dataset = dataset.prefetch(buffer_size=buffer_size) 51 dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000))) 52 53 name_str = ("map_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f_" 54 "num_parallel_calls_%d_buffer_size_%d") 55 return self.run_and_report_benchmark( 56 dataset=dataset, 57 num_elements=10000, 58 name=name_str % 59 (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size)) 60 61 def benchmark_map(self): 62 nums_parallel_calls = [4, 8, 12] 63 buffer_sizes = [10, 50, 100, 150, 200, 250, 300] 64 65 parameters_list = [] 66 wall_time_map = {} 67 68 for num_parallel_calls in nums_parallel_calls: 69 for buffer_size in buffer_sizes: 70 parameters = (num_parallel_calls, buffer_size) 71 parameters_list.append(parameters) 72 wall_time = self._benchmark_map(num_parallel_calls, buffer_size) 73 wall_time_map[parameters] = wall_time 74 75 parameters_list.sort(key=lambda x: wall_time_map[x]) 76 for parameters in parameters_list: 77 print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters, 78 wall_time_map[parameters]) 79 80 def _benchmark_map_and_batch(self, num_parallel_calls, buffer_size): 81 batch_size = 16 82 k = 1024 * 1024 83 84 dataset = dataset_ops.Dataset.from_tensors( 85 (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat() 86 dataset = dataset.map( 87 math_ops.matmul, num_parallel_calls=num_parallel_calls) 88 dataset = dataset.batch(batch_size=batch_size) 89 dataset = dataset.map(map_function) 90 dataset = dataset.prefetch(buffer_size=buffer_size) 91 dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000))) 92 93 name_str = ("map_and_batch_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f" 94 "_num_parallel_calls_%d_buffer_size_%d") 95 return self.run_and_report_benchmark( 96 dataset=dataset, 97 num_elements=1000, 98 name=name_str % 99 (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size)) 100 101 def benchmark_map_and_batch(self): 102 nums_parallel_calls = [4, 8, 12] 103 buffer_sizes = [10, 50, 100, 150, 200, 250, 300] 104 105 parameters_list = [] 106 wall_time_map = {} 107 108 for num_parallel_calls in nums_parallel_calls: 109 for buffer_size in buffer_sizes: 110 parameters = (num_parallel_calls, buffer_size) 111 parameters_list.append(parameters) 112 wall_time = self._benchmark_map_and_batch(num_parallel_calls, 113 buffer_size) 114 wall_time_map[parameters] = wall_time 115 116 parameters_list.sort(key=lambda x: wall_time_map[x]) 117 for parameters in parameters_list: 118 print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters, 119 wall_time_map[parameters]) 120 121 def _benchmark_interleave(self, num_parallel_calls, buffer_size): 122 k = 1024 * 1024 123 dataset = dataset_ops.Dataset.from_tensors( 124 (np.random.rand(1, 4 * k), np.random.rand(4 * k, 1))).repeat() 125 dataset = dataset.map(math_ops.matmul) 126 dataset = dataset.map(map_function) 127 dataset = dataset_ops.Dataset.range(1).repeat().interleave( 128 lambda _: dataset, # pylint: disable=cell-var-from-loop 129 cycle_length=10, 130 num_parallel_calls=num_parallel_calls) 131 dataset = dataset.prefetch(buffer_size=buffer_size) 132 dataset = dataset.apply(testing.sleep(int(input_sleep_ms * 1000))) 133 134 name_str = ("interleave_max_output_sleep_ms_%.2f_input_sleep_ms_%.2f" 135 "_num_parallel_calls_%d_buffer_size_%d") 136 return self.run_and_report_benchmark( 137 dataset=dataset, 138 num_elements=10000, 139 name=name_str % 140 (max_output_sleep_ms, input_sleep_ms, num_parallel_calls, buffer_size)) 141 142 def benchmark_interleave(self): 143 nums_parallel_calls = [4, 8, 10] 144 buffer_sizes = [10, 50, 100, 150, 200, 250, 300] 145 146 parameters_list = [] 147 wall_time_map = {} 148 149 for num_parallel_calls in nums_parallel_calls: 150 for buffer_size in buffer_sizes: 151 parameters = (num_parallel_calls, buffer_size) 152 parameters_list.append(parameters) 153 wall_time = self._benchmark_interleave(num_parallel_calls, buffer_size) 154 wall_time_map[parameters] = wall_time 155 156 parameters_list.sort(key=lambda x: wall_time_map[x]) 157 for parameters in parameters_list: 158 print("num_parallel_calls_%d_buffer_size_%d_wall_time:" % parameters, 159 wall_time_map[parameters]) 160 161 162if __name__ == "__main__": 163 benchmark_base.test.main() 164