xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/bm_basic_work_queue.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <deque>
15 
16 #include <benchmark/benchmark.h>
17 
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/support/log.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include "src/core/lib/event_engine/common_closures.h"
23 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
24 #include "src/core/lib/gprpp/sync.h"
25 #include "test/core/util/test_config.h"
26 
27 namespace {
28 
29 using ::grpc_event_engine::experimental::AnyInvocableClosure;
30 using ::grpc_event_engine::experimental::BasicWorkQueue;
31 using ::grpc_event_engine::experimental::EventEngine;
32 
33 grpc_core::Mutex globalMu;
34 BasicWorkQueue globalWorkQueue;
35 std::deque<EventEngine::Closure*> globalDeque;
36 
37 // --- Multithreaded Tests ---------------------------------------------------
38 
MultithreadedTestArguments(benchmark::internal::Benchmark * b)39 void MultithreadedTestArguments(benchmark::internal::Benchmark* b) {
40   b->Range(1, 512)
41       ->UseRealTime()
42       ->MeasureProcessCPUTime()
43       ->Threads(1)
44       ->Threads(4)
45       ->ThreadPerCpu();
46 }
47 
BM_MultithreadedWorkQueuePopOldest(benchmark::State & state)48 void BM_MultithreadedWorkQueuePopOldest(benchmark::State& state) {
49   AnyInvocableClosure closure([] {});
50   int element_count = state.range(0);
51   double pop_attempts = 0;
52   for (auto _ : state) {
53     for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
54     int cnt = 0;
55     do {
56       if (++pop_attempts && globalWorkQueue.PopOldest() != nullptr) ++cnt;
57     } while (cnt < element_count);
58   }
59   state.counters["added"] = element_count * state.iterations();
60   state.counters["pop_rate"] = benchmark::Counter(
61       element_count * state.iterations(), benchmark::Counter::kIsRate);
62   state.counters["pop_attempts"] = pop_attempts;
63   // Rough measurement of queue contention.
64   // WorkQueue::Pop* may return nullptr when the queue is non-empty, usually
65   // when under thread contention. hit_rate is the ratio of pop attempts to
66   // closure executions.
67   state.counters["hit_rate"] =
68       benchmark::Counter(element_count * state.iterations() / pop_attempts,
69                          benchmark::Counter::kAvgThreads);
70   if (state.thread_index() == 0) {
71     GPR_ASSERT(globalWorkQueue.Empty());
72   }
73 }
74 BENCHMARK(BM_MultithreadedWorkQueuePopOldest)
75     ->Apply(MultithreadedTestArguments);
76 
BM_MultithreadedWorkQueuePopMostRecent(benchmark::State & state)77 void BM_MultithreadedWorkQueuePopMostRecent(benchmark::State& state) {
78   AnyInvocableClosure closure([] {});
79   int element_count = state.range(0);
80   double pop_attempts = 0;
81   for (auto _ : state) {
82     for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
83     int cnt = 0;
84     do {
85       if (++pop_attempts && globalWorkQueue.PopMostRecent() != nullptr) ++cnt;
86     } while (cnt < element_count);
87   }
88   state.counters["added"] = element_count * state.iterations();
89   state.counters["pop_rate"] = benchmark::Counter(
90       element_count * state.iterations(), benchmark::Counter::kIsRate);
91   state.counters["pop_attempts"] = pop_attempts;
92   state.counters["hit_rate"] =
93       benchmark::Counter(element_count * state.iterations() / pop_attempts,
94                          benchmark::Counter::kAvgThreads);
95   if (state.thread_index() == 0) {
96     GPR_ASSERT(globalWorkQueue.Empty());
97   }
98 }
99 BENCHMARK(BM_MultithreadedWorkQueuePopMostRecent)
100     ->Apply(MultithreadedTestArguments);
101 
BM_MultithreadedStdDequeLIFO(benchmark::State & state)102 void BM_MultithreadedStdDequeLIFO(benchmark::State& state) {
103   int element_count = state.range(0);
104   AnyInvocableClosure closure([] {});
105   for (auto _ : state) {
106     for (int i = 0; i < element_count; i++) {
107       grpc_core::MutexLock lock(&globalMu);
108       globalDeque.push_back(&closure);
109     }
110     for (int i = 0; i < element_count; i++) {
111       grpc_core::MutexLock lock(&globalMu);
112       EventEngine::Closure* popped = globalDeque.back();
113       globalDeque.pop_back();
114       GPR_ASSERT(popped != nullptr);
115     }
116   }
117   state.counters["added"] = element_count * state.iterations();
118   state.counters["pop_attempts"] = state.counters["added"];
119   state.counters["pop_rate"] = benchmark::Counter(
120       element_count * state.iterations(), benchmark::Counter::kIsRate);
121   state.counters["hit_rate"] =
122       benchmark::Counter(1, benchmark::Counter::kAvgThreads);
123 }
124 BENCHMARK(BM_MultithreadedStdDequeLIFO)->Apply(MultithreadedTestArguments);
125 
126 // --- Basic Functionality Tests ---------------------------------------------
127 
BM_WorkQueueIntptrPopMostRecent(benchmark::State & state)128 void BM_WorkQueueIntptrPopMostRecent(benchmark::State& state) {
129   BasicWorkQueue queue;
130   grpc_event_engine::experimental::AnyInvocableClosure closure([] {});
131   int element_count = state.range(0);
132   for (auto _ : state) {
133     int cnt = 0;
134     for (int i = 0; i < element_count; i++) queue.Add(&closure);
135     do {
136       if (queue.PopMostRecent() != nullptr) ++cnt;
137     } while (cnt < element_count);
138   }
139   state.counters["Added"] = element_count * state.iterations();
140   state.counters["Popped"] = state.counters["Added"];
141   state.counters["Pop Rate"] =
142       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
143 }
144 BENCHMARK(BM_WorkQueueIntptrPopMostRecent)
145     ->Range(1, 512)
146     ->UseRealTime()
147     ->MeasureProcessCPUTime();
148 
BM_WorkQueueClosureExecution(benchmark::State & state)149 void BM_WorkQueueClosureExecution(benchmark::State& state) {
150   BasicWorkQueue queue;
151   int element_count = state.range(0);
152   int run_count = 0;
153   grpc_event_engine::experimental::AnyInvocableClosure closure(
154       [&run_count] { ++run_count; });
155   for (auto _ : state) {
156     for (int i = 0; i < element_count; i++) queue.Add(&closure);
157     do {
158       queue.PopMostRecent()->Run();
159     } while (run_count < element_count);
160     run_count = 0;
161   }
162   state.counters["Added"] = element_count * state.iterations();
163   state.counters["Popped"] = state.counters["Added"];
164   state.counters["Pop Rate"] =
165       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
166 }
167 BENCHMARK(BM_WorkQueueClosureExecution)
168     ->Range(8, 128)
169     ->UseRealTime()
170     ->MeasureProcessCPUTime();
171 
BM_WorkQueueAnyInvocableExecution(benchmark::State & state)172 void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) {
173   BasicWorkQueue queue;
174   int element_count = state.range(0);
175   int run_count = 0;
176   for (auto _ : state) {
177     for (int i = 0; i < element_count; i++) {
178       queue.Add([&run_count] { ++run_count; });
179     }
180     do {
181       queue.PopMostRecent()->Run();
182     } while (run_count < element_count);
183     run_count = 0;
184   }
185   state.counters["Added"] = element_count * state.iterations();
186   state.counters["Popped"] = state.counters["Added"];
187   state.counters["Pop Rate"] =
188       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
189 }
190 BENCHMARK(BM_WorkQueueAnyInvocableExecution)
191     ->Range(8, 128)
192     ->UseRealTime()
193     ->MeasureProcessCPUTime();
194 
195 }  // namespace
196 
197 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
198 // and others do not. This allows us to support both modes.
199 namespace benchmark {
RunTheBenchmarksNamespaced()200 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
201 }  // namespace benchmark
202 
main(int argc,char ** argv)203 int main(int argc, char** argv) {
204   grpc::testing::TestEnvironment env(&argc, argv);
205   ::benchmark::Initialize(&argc, argv);
206   benchmark::RunTheBenchmarksNamespaced();
207   return 0;
208 }
209