xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/bm_thread_pool.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <atomic>
16 #include <cmath>
17 #include <memory>
18 #include <vector>
19 
20 #include <benchmark/benchmark.h>
21 
22 #include "absl/strings/str_format.h"
23 
24 #include <grpc/support/cpu.h>
25 #include <grpcpp/impl/grpc_library.h>
26 
27 #include "src/core/lib/event_engine/common_closures.h"
28 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/gprpp/notification.h"
32 #include "test/core/util/test_config.h"
33 #include "test/cpp/microbenchmarks/helpers.h"
34 #include "test/cpp/util/test_config.h"
35 
36 namespace {
37 
38 using ::grpc_event_engine::experimental::AnyInvocableClosure;
39 using ::grpc_event_engine::experimental::EventEngine;
40 using ::grpc_event_engine::experimental::ThreadPool;
41 
42 struct FanoutParameters {
43   int depth;
44   int fanout;
45   int limit;
46 };
47 
BM_ThreadPool_RunSmallLambda(benchmark::State & state)48 void BM_ThreadPool_RunSmallLambda(benchmark::State& state) {
49   auto pool = grpc_event_engine::experimental::MakeThreadPool(
50       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
51   const int cb_count = state.range(0);
52   std::atomic_int runcount{0};
53   for (auto _ : state) {
54     state.PauseTiming();
55     runcount.store(0);
56     grpc_core::Notification signal;
57     auto cb = [&signal, &runcount, cb_count]() {
58       if (runcount.fetch_add(1, std::memory_order_relaxed) + 1 == cb_count) {
59         signal.Notify();
60       }
61     };
62     state.ResumeTiming();
63     for (int i = 0; i < cb_count; i++) {
64       pool->Run(cb);
65     }
66     signal.WaitForNotification();
67   }
68   state.SetItemsProcessed(cb_count * state.iterations());
69   pool->Quiesce();
70 }
71 BENCHMARK(BM_ThreadPool_RunSmallLambda)
72     ->Range(100, 4096)
73     ->MeasureProcessCPUTime()
74     ->UseRealTime();
75 
BM_ThreadPool_RunClosure(benchmark::State & state)76 void BM_ThreadPool_RunClosure(benchmark::State& state) {
77   int cb_count = state.range(0);
78   grpc_core::Notification* signal = new grpc_core::Notification();
79   std::atomic_int count{0};
80   AnyInvocableClosure* closure =
81       new AnyInvocableClosure([signal_holder = &signal, cb_count, &count]() {
82         if (++count == cb_count) {
83           (*signal_holder)->Notify();
84         }
85       });
86   auto pool = grpc_event_engine::experimental::MakeThreadPool(
87       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
88   for (auto _ : state) {
89     for (int i = 0; i < cb_count; i++) {
90       pool->Run(closure);
91     }
92     signal->WaitForNotification();
93     state.PauseTiming();
94     delete signal;
95     signal = new grpc_core::Notification();
96     count.store(0);
97     state.ResumeTiming();
98   }
99   delete signal;
100   state.SetItemsProcessed(cb_count * state.iterations());
101   pool->Quiesce();
102   delete closure;
103 }
104 BENCHMARK(BM_ThreadPool_RunClosure)
105     ->Range(100, 4096)
106     ->MeasureProcessCPUTime()
107     ->UseRealTime();
108 
FanoutTestArguments(benchmark::internal::Benchmark * b)109 void FanoutTestArguments(benchmark::internal::Benchmark* b) {
110   // TODO(hork): enable when the engines are fast enough to run these:
111   // ->Args({10000, 1})  // chain of callbacks scheduling callbacks
112   // ->Args({1, 10000})  // flat scheduling of callbacks
113   // ->Args({5, 6})      // depth 5, fans out to 9,330 callbacks
114   //  ->Args({2, 100})   // depth 2, fans out 10,101 callbacks
115   //  ->Args({4, 10})    // depth 4, fans out to 11,110 callbacks
116   b->Args({1000, 1})     // chain of callbacks scheduling callbacks
117       ->Args({100, 1})   // chain of callbacks scheduling callbacks
118       ->Args({1, 1000})  // flat scheduling of callbacks
119       ->Args({1, 100})   // flat scheduling of callbacks
120       ->Args({2, 70})    // depth 2, fans out 4971
121       ->Args({4, 8})     // depth 4, fans out 4681
122       ->UseRealTime()
123       ->MeasureProcessCPUTime();
124 }
125 
GetFanoutParameters(benchmark::State & state)126 FanoutParameters GetFanoutParameters(benchmark::State& state) {
127   FanoutParameters params;
128   params.depth = state.range(0);
129   params.fanout = state.range(1);
130   if (params.depth == 1 || params.fanout == 1) {
131     params.limit = std::max(params.depth, params.fanout) + 1;
132   } else {
133     // sum of geometric series
134     params.limit =
135         (1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout);
136   }
137   // sanity checking
138   GPR_ASSERT(params.limit >= params.fanout * params.depth);
139   return params;
140 }
141 
142 // Callback for Lambda FanOut tests
143 //
144 // Note that params are copied each time for 2 reasons: 1) callbacks will
145 // inevitably continue to shut down after the end of the test, so a reference
146 // parameter will become invalid and crash some callbacks, and 2) in my RBE
147 // tests, copies are slightly faster than a shared_ptr<FanoutParams>
148 // alternative.
FanOutCallback(std::shared_ptr<ThreadPool> pool,const FanoutParameters params,grpc_core::Notification & signal,std::atomic_int & count,int processing_layer)149 void FanOutCallback(std::shared_ptr<ThreadPool> pool,
150                     const FanoutParameters params,
151                     grpc_core::Notification& signal, std::atomic_int& count,
152                     int processing_layer) {
153   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
154   if (local_cnt == params.limit) {
155     signal.Notify();
156     return;
157   }
158   GPR_DEBUG_ASSERT(local_cnt < params.limit);
159   if (params.depth == processing_layer) return;
160   for (int i = 0; i < params.fanout; i++) {
161     pool->Run([pool, params, processing_layer, &count, &signal]() {
162       FanOutCallback(pool, params, signal, count, processing_layer + 1);
163     });
164   }
165 }
166 
BM_ThreadPool_Lambda_FanOut(benchmark::State & state)167 void BM_ThreadPool_Lambda_FanOut(benchmark::State& state) {
168   auto params = GetFanoutParameters(state);
169   auto pool = grpc_event_engine::experimental::MakeThreadPool(
170       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
171   for (auto _ : state) {
172     std::atomic_int count{0};
173     grpc_core::Notification signal;
174     FanOutCallback(pool, params, signal, count, /*processing_layer=*/0);
175     do {
176       signal.WaitForNotification();
177     } while (count.load() != params.limit);
178   }
179   state.SetItemsProcessed(params.limit * state.iterations());
180   pool->Quiesce();
181 }
182 BENCHMARK(BM_ThreadPool_Lambda_FanOut)->Apply(FanoutTestArguments);
183 
ClosureFanOutCallback(EventEngine::Closure * child_closure,std::shared_ptr<ThreadPool> pool,grpc_core::Notification ** signal_holder,std::atomic_int & count,const FanoutParameters params)184 void ClosureFanOutCallback(EventEngine::Closure* child_closure,
185                            std::shared_ptr<ThreadPool> pool,
186                            grpc_core::Notification** signal_holder,
187                            std::atomic_int& count,
188                            const FanoutParameters params) {
189   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
190   if (local_cnt == params.limit) {
191     (*signal_holder)->Notify();
192     return;
193   }
194   if (local_cnt > params.limit) {
195     grpc_core::Crash(absl::StrFormat("Ran too many closures: %d/%d", local_cnt,
196                                      params.limit));
197   }
198   if (child_closure == nullptr) return;
199   for (int i = 0; i < params.fanout; i++) {
200     pool->Run(child_closure);
201   }
202 }
203 
BM_ThreadPool_Closure_FanOut(benchmark::State & state)204 void BM_ThreadPool_Closure_FanOut(benchmark::State& state) {
205   auto params = GetFanoutParameters(state);
206   auto pool = grpc_event_engine::experimental::MakeThreadPool(
207       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
208   std::vector<EventEngine::Closure*> closures;
209   closures.reserve(params.depth + 2);
210   closures.push_back(nullptr);
211   grpc_core::Notification* signal = new grpc_core::Notification();
212   std::atomic_int count{0};
213   // prepare a unique closure for each depth
214   for (int i = 0; i <= params.depth; i++) {
215     // call the previous closure (e.g., closures[2] calls closures[1] during
216     // fanout)
217     closures.push_back(new AnyInvocableClosure(
218         [i, pool, &closures, params, signal_holder = &signal, &count]() {
219           ClosureFanOutCallback(closures[i], pool, signal_holder, count,
220                                 params);
221         }));
222   }
223   for (auto _ : state) {
224     GPR_DEBUG_ASSERT(count.load(std::memory_order_relaxed) == 0);
225     pool->Run(closures[params.depth + 1]);
226     do {
227       signal->WaitForNotification();
228     } while (count.load() != params.limit);
229     // cleanup
230     state.PauseTiming();
231     delete signal;
232     signal = new grpc_core::Notification();
233     count.store(0);
234     state.ResumeTiming();
235   }
236   delete signal;
237   state.SetItemsProcessed(params.limit * state.iterations());
238   for (auto i : closures) delete i;
239   pool->Quiesce();
240 }
241 BENCHMARK(BM_ThreadPool_Closure_FanOut)->Apply(FanoutTestArguments);
242 
243 }  // namespace
244 
245 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
246 // and others do not. This allows us to support both modes.
247 namespace benchmark {
RunTheBenchmarksNamespaced()248 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
249 }  // namespace benchmark
250 
main(int argc,char ** argv)251 int main(int argc, char** argv) {
252   grpc::testing::TestEnvironment env(&argc, argv);
253   LibraryInitializer libInit;
254   benchmark::Initialize(&argc, argv);
255   grpc::testing::InitTest(&argc, &argv, false);
256 
257   benchmark::RunTheBenchmarksNamespaced();
258   return 0;
259 }
260