1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <deque>
15
16 #include <benchmark/benchmark.h>
17
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/support/log.h>
20 #include <grpc/support/port_platform.h>
21
22 #include "src/core/lib/event_engine/common_closures.h"
23 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
24 #include "src/core/lib/gprpp/sync.h"
25 #include "test/core/util/test_config.h"
26
27 namespace {
28
29 using ::grpc_event_engine::experimental::AnyInvocableClosure;
30 using ::grpc_event_engine::experimental::BasicWorkQueue;
31 using ::grpc_event_engine::experimental::EventEngine;
32
33 grpc_core::Mutex globalMu;
34 BasicWorkQueue globalWorkQueue;
35 std::deque<EventEngine::Closure*> globalDeque;
36
37 // --- Multithreaded Tests ---------------------------------------------------
38
MultithreadedTestArguments(benchmark::internal::Benchmark * b)39 void MultithreadedTestArguments(benchmark::internal::Benchmark* b) {
40 b->Range(1, 512)
41 ->UseRealTime()
42 ->MeasureProcessCPUTime()
43 ->Threads(1)
44 ->Threads(4)
45 ->ThreadPerCpu();
46 }
47
BM_MultithreadedWorkQueuePopOldest(benchmark::State & state)48 void BM_MultithreadedWorkQueuePopOldest(benchmark::State& state) {
49 AnyInvocableClosure closure([] {});
50 int element_count = state.range(0);
51 double pop_attempts = 0;
52 for (auto _ : state) {
53 for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
54 int cnt = 0;
55 do {
56 if (++pop_attempts && globalWorkQueue.PopOldest() != nullptr) ++cnt;
57 } while (cnt < element_count);
58 }
59 state.counters["added"] = element_count * state.iterations();
60 state.counters["pop_rate"] = benchmark::Counter(
61 element_count * state.iterations(), benchmark::Counter::kIsRate);
62 state.counters["pop_attempts"] = pop_attempts;
63 // Rough measurement of queue contention.
64 // WorkQueue::Pop* may return nullptr when the queue is non-empty, usually
65 // when under thread contention. hit_rate is the ratio of pop attempts to
66 // closure executions.
67 state.counters["hit_rate"] =
68 benchmark::Counter(element_count * state.iterations() / pop_attempts,
69 benchmark::Counter::kAvgThreads);
70 if (state.thread_index() == 0) {
71 GPR_ASSERT(globalWorkQueue.Empty());
72 }
73 }
74 BENCHMARK(BM_MultithreadedWorkQueuePopOldest)
75 ->Apply(MultithreadedTestArguments);
76
BM_MultithreadedWorkQueuePopMostRecent(benchmark::State & state)77 void BM_MultithreadedWorkQueuePopMostRecent(benchmark::State& state) {
78 AnyInvocableClosure closure([] {});
79 int element_count = state.range(0);
80 double pop_attempts = 0;
81 for (auto _ : state) {
82 for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
83 int cnt = 0;
84 do {
85 if (++pop_attempts && globalWorkQueue.PopMostRecent() != nullptr) ++cnt;
86 } while (cnt < element_count);
87 }
88 state.counters["added"] = element_count * state.iterations();
89 state.counters["pop_rate"] = benchmark::Counter(
90 element_count * state.iterations(), benchmark::Counter::kIsRate);
91 state.counters["pop_attempts"] = pop_attempts;
92 state.counters["hit_rate"] =
93 benchmark::Counter(element_count * state.iterations() / pop_attempts,
94 benchmark::Counter::kAvgThreads);
95 if (state.thread_index() == 0) {
96 GPR_ASSERT(globalWorkQueue.Empty());
97 }
98 }
99 BENCHMARK(BM_MultithreadedWorkQueuePopMostRecent)
100 ->Apply(MultithreadedTestArguments);
101
BM_MultithreadedStdDequeLIFO(benchmark::State & state)102 void BM_MultithreadedStdDequeLIFO(benchmark::State& state) {
103 int element_count = state.range(0);
104 AnyInvocableClosure closure([] {});
105 for (auto _ : state) {
106 for (int i = 0; i < element_count; i++) {
107 grpc_core::MutexLock lock(&globalMu);
108 globalDeque.push_back(&closure);
109 }
110 for (int i = 0; i < element_count; i++) {
111 grpc_core::MutexLock lock(&globalMu);
112 EventEngine::Closure* popped = globalDeque.back();
113 globalDeque.pop_back();
114 GPR_ASSERT(popped != nullptr);
115 }
116 }
117 state.counters["added"] = element_count * state.iterations();
118 state.counters["pop_attempts"] = state.counters["added"];
119 state.counters["pop_rate"] = benchmark::Counter(
120 element_count * state.iterations(), benchmark::Counter::kIsRate);
121 state.counters["hit_rate"] =
122 benchmark::Counter(1, benchmark::Counter::kAvgThreads);
123 }
124 BENCHMARK(BM_MultithreadedStdDequeLIFO)->Apply(MultithreadedTestArguments);
125
126 // --- Basic Functionality Tests ---------------------------------------------
127
BM_WorkQueueIntptrPopMostRecent(benchmark::State & state)128 void BM_WorkQueueIntptrPopMostRecent(benchmark::State& state) {
129 BasicWorkQueue queue;
130 grpc_event_engine::experimental::AnyInvocableClosure closure([] {});
131 int element_count = state.range(0);
132 for (auto _ : state) {
133 int cnt = 0;
134 for (int i = 0; i < element_count; i++) queue.Add(&closure);
135 do {
136 if (queue.PopMostRecent() != nullptr) ++cnt;
137 } while (cnt < element_count);
138 }
139 state.counters["Added"] = element_count * state.iterations();
140 state.counters["Popped"] = state.counters["Added"];
141 state.counters["Pop Rate"] =
142 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
143 }
144 BENCHMARK(BM_WorkQueueIntptrPopMostRecent)
145 ->Range(1, 512)
146 ->UseRealTime()
147 ->MeasureProcessCPUTime();
148
BM_WorkQueueClosureExecution(benchmark::State & state)149 void BM_WorkQueueClosureExecution(benchmark::State& state) {
150 BasicWorkQueue queue;
151 int element_count = state.range(0);
152 int run_count = 0;
153 grpc_event_engine::experimental::AnyInvocableClosure closure(
154 [&run_count] { ++run_count; });
155 for (auto _ : state) {
156 for (int i = 0; i < element_count; i++) queue.Add(&closure);
157 do {
158 queue.PopMostRecent()->Run();
159 } while (run_count < element_count);
160 run_count = 0;
161 }
162 state.counters["Added"] = element_count * state.iterations();
163 state.counters["Popped"] = state.counters["Added"];
164 state.counters["Pop Rate"] =
165 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
166 }
167 BENCHMARK(BM_WorkQueueClosureExecution)
168 ->Range(8, 128)
169 ->UseRealTime()
170 ->MeasureProcessCPUTime();
171
BM_WorkQueueAnyInvocableExecution(benchmark::State & state)172 void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) {
173 BasicWorkQueue queue;
174 int element_count = state.range(0);
175 int run_count = 0;
176 for (auto _ : state) {
177 for (int i = 0; i < element_count; i++) {
178 queue.Add([&run_count] { ++run_count; });
179 }
180 do {
181 queue.PopMostRecent()->Run();
182 } while (run_count < element_count);
183 run_count = 0;
184 }
185 state.counters["Added"] = element_count * state.iterations();
186 state.counters["Popped"] = state.counters["Added"];
187 state.counters["Pop Rate"] =
188 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
189 }
190 BENCHMARK(BM_WorkQueueAnyInvocableExecution)
191 ->Range(8, 128)
192 ->UseRealTime()
193 ->MeasureProcessCPUTime();
194
195 } // namespace
196
197 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
198 // and others do not. This allows us to support both modes.
199 namespace benchmark {
RunTheBenchmarksNamespaced()200 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
201 } // namespace benchmark
202
main(int argc,char ** argv)203 int main(int argc, char** argv) {
204 grpc::testing::TestEnvironment env(&argc, argv);
205 ::benchmark::Initialize(&argc, argv);
206 benchmark::RunTheBenchmarksNamespaced();
207 return 0;
208 }
209