xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/bm_event_engine_run.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <atomic>
16 #include <cmath>
17 #include <memory>
18 #include <vector>
19 
20 #include <benchmark/benchmark.h>
21 
22 #include "absl/debugging/leak_check.h"
23 #include "absl/functional/any_invocable.h"
24 #include "absl/strings/str_format.h"
25 
26 #include <grpc/event_engine/event_engine.h>
27 #include <grpcpp/impl/grpc_library.h>
28 
29 #include "src/core/lib/event_engine/common_closures.h"
30 #include "src/core/lib/event_engine/default_event_engine.h"
31 #include "src/core/lib/gprpp/crash.h"
32 #include "src/core/lib/gprpp/notification.h"
33 #include "test/core/util/test_config.h"
34 #include "test/cpp/microbenchmarks/helpers.h"
35 #include "test/cpp/util/test_config.h"
36 
37 namespace {
38 
39 using ::grpc_event_engine::experimental::AnyInvocableClosure;
40 using ::grpc_event_engine::experimental::EventEngine;
41 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
42 
43 struct FanoutParameters {
44   int depth;
45   int fanout;
46   int limit;
47 };
48 
BM_EventEngine_RunSmallLambda(benchmark::State & state)49 void BM_EventEngine_RunSmallLambda(benchmark::State& state) {
50   auto engine = GetDefaultEventEngine();
51   const int cb_count = state.range(0);
52   std::atomic_int count{0};
53   for (auto _ : state) {
54     state.PauseTiming();
55     grpc_core::Notification signal;
56     auto cb = [&signal, &count, cb_count]() {
57       if (++count == cb_count) signal.Notify();
58     };
59     state.ResumeTiming();
60     for (int i = 0; i < cb_count; i++) {
61       engine->Run(cb);
62     }
63     signal.WaitForNotification();
64     count.store(0);
65   }
66   state.SetItemsProcessed(cb_count * state.iterations());
67 }
68 BENCHMARK(BM_EventEngine_RunSmallLambda)
69     ->Range(100, 4096)
70     ->MeasureProcessCPUTime()
71     ->UseRealTime();
72 
BM_EventEngine_RunLargeLambda(benchmark::State & state)73 void BM_EventEngine_RunLargeLambda(benchmark::State& state) {
74   int cb_count = state.range(0);
75   // large lambdas require an extra allocation
76   std::string extra = "12345678";
77   auto engine = GetDefaultEventEngine();
78   std::atomic_int count{0};
79   for (auto _ : state) {
80     state.PauseTiming();
81     grpc_core::Notification signal;
82     auto cb = [&signal, &count, cb_count, extra]() {
83       (void)extra;
84       if (++count == cb_count) signal.Notify();
85     };
86     state.ResumeTiming();
87     for (int i = 0; i < cb_count; i++) {
88       engine->Run(cb);
89     }
90     signal.WaitForNotification();
91     count.store(0);
92   }
93   state.SetItemsProcessed(cb_count * state.iterations());
94 }
95 BENCHMARK(BM_EventEngine_RunLargeLambda)
96     ->Range(100, 4096)
97     ->MeasureProcessCPUTime()
98     ->UseRealTime();
99 
BM_EventEngine_RunClosure(benchmark::State & state)100 void BM_EventEngine_RunClosure(benchmark::State& state) {
101   int cb_count = state.range(0);
102   grpc_core::Notification* signal = new grpc_core::Notification();
103   std::atomic_int count{0};
104   // Ignore leaks from this closure. For simplicty, this closure is not deleted
105   // because the closure may still be executing after the EventEngine is
106   // destroyed. This is because the default posix EventEngine's thread pool may
107   // get destroyed separately from the EventEngine.
108   AnyInvocableClosure* closure = absl::IgnoreLeak(
109       new AnyInvocableClosure([signal_holder = &signal, cb_count, &count]() {
110         if (++count == cb_count) {
111           (*signal_holder)->Notify();
112         }
113       }));
114   auto engine = GetDefaultEventEngine();
115   for (auto _ : state) {
116     for (int i = 0; i < cb_count; i++) {
117       engine->Run(closure);
118     }
119     signal->WaitForNotification();
120     state.PauseTiming();
121     delete signal;
122     signal = new grpc_core::Notification();
123     count.store(0);
124     state.ResumeTiming();
125   }
126   delete signal;
127   state.SetItemsProcessed(cb_count * state.iterations());
128 }
129 BENCHMARK(BM_EventEngine_RunClosure)
130     ->Range(100, 4096)
131     ->MeasureProcessCPUTime()
132     ->UseRealTime();
133 
FanoutTestArguments(benchmark::internal::Benchmark * b)134 void FanoutTestArguments(benchmark::internal::Benchmark* b) {
135   // TODO(hork): enable when the engines are fast enough to run these:
136   // ->Args({10000, 1})  // chain of callbacks scheduling callbacks
137   // ->Args({1, 10000})  // flat scheduling of callbacks
138   // ->Args({5, 6})      // depth 5, fans out to 9,330 callbacks
139   //  ->Args({2, 100})   // depth 2, fans out 10,101 callbacks
140   //  ->Args({4, 10})    // depth 4, fans out to 11,110 callbacks
141   b->Args({1000, 1})     // chain of callbacks scheduling callbacks
142       ->Args({100, 1})   // chain of callbacks scheduling callbacks
143       ->Args({1, 1000})  // flat scheduling of callbacks
144       ->Args({1, 100})   // flat scheduling of callbacks
145       ->Args({2, 70})    // depth 2, fans out 4971
146       ->Args({4, 8})     // depth 4, fans out 4681
147       ->UseRealTime()
148       ->MeasureProcessCPUTime();
149 }
150 
GetFanoutParameters(benchmark::State & state)151 FanoutParameters GetFanoutParameters(benchmark::State& state) {
152   FanoutParameters params;
153   params.depth = state.range(0);
154   params.fanout = state.range(1);
155   if (params.depth == 1 || params.fanout == 1) {
156     params.limit = std::max(params.depth, params.fanout) + 1;
157   } else {
158     // sum of geometric series
159     params.limit =
160         (1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout);
161   }
162   // sanity checking
163   GPR_ASSERT(params.limit >= params.fanout * params.depth);
164   return params;
165 }
166 
167 // EventEngine callback for Lambda FanOut tests
168 //
169 // Note that params are copied each time for 2 reasons: 1) callbacks will
170 // inevitably continue to shut down after the end of the test, so a reference
171 // parameter will become invalid and crash some callbacks, and 2) in my RBE
172 // tests, copies are slightly faster than a shared_ptr<FanoutParams>
173 // alternative.
FanOutCallback(std::shared_ptr<EventEngine> engine,const FanoutParameters params,grpc_core::Notification & signal,std::atomic_int & count,int processing_layer)174 void FanOutCallback(std::shared_ptr<EventEngine> engine,
175                     const FanoutParameters params,
176                     grpc_core::Notification& signal, std::atomic_int& count,
177                     int processing_layer) {
178   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
179   if (local_cnt == params.limit) {
180     signal.Notify();
181     return;
182   }
183   GPR_DEBUG_ASSERT(local_cnt < params.limit);
184   if (params.depth == processing_layer) return;
185   for (int i = 0; i < params.fanout; i++) {
186     engine->Run([engine, params, processing_layer, &count, &signal]() {
187       FanOutCallback(engine, params, signal, count, processing_layer + 1);
188     });
189   }
190 }
191 
BM_EventEngine_Lambda_FanOut(benchmark::State & state)192 void BM_EventEngine_Lambda_FanOut(benchmark::State& state) {
193   auto params = GetFanoutParameters(state);
194   auto engine = GetDefaultEventEngine();
195   for (auto _ : state) {
196     std::atomic_int count{0};
197     grpc_core::Notification signal;
198     FanOutCallback(engine, params, signal, count, /*processing_layer=*/0);
199     do {
200       signal.WaitForNotification();
201     } while (count.load() != params.limit);
202   }
203   state.SetItemsProcessed(params.limit * state.iterations());
204 }
205 BENCHMARK(BM_EventEngine_Lambda_FanOut)->Apply(FanoutTestArguments);
206 
ClosureFanOutCallback(EventEngine::Closure * child_closure,std::shared_ptr<EventEngine> engine,grpc_core::Notification ** signal_holder,std::atomic_int & count,const FanoutParameters params)207 void ClosureFanOutCallback(EventEngine::Closure* child_closure,
208                            std::shared_ptr<EventEngine> engine,
209                            grpc_core::Notification** signal_holder,
210                            std::atomic_int& count,
211                            const FanoutParameters params) {
212   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
213   if (local_cnt == params.limit) {
214     (*signal_holder)->Notify();
215     return;
216   }
217   if (local_cnt > params.limit) {
218     grpc_core::Crash(absl::StrFormat("Ran too many closures: %d/%d", local_cnt,
219                                      params.limit));
220   }
221   if (child_closure == nullptr) return;
222   for (int i = 0; i < params.fanout; i++) {
223     engine->Run(child_closure);
224   }
225 }
226 
BM_EventEngine_Closure_FanOut(benchmark::State & state)227 void BM_EventEngine_Closure_FanOut(benchmark::State& state) {
228   auto params = GetFanoutParameters(state);
229   auto engine = GetDefaultEventEngine();
230   std::vector<EventEngine::Closure*> closures;
231   closures.reserve(params.depth + 2);
232   closures.push_back(nullptr);
233   grpc_core::Notification* signal = new grpc_core::Notification();
234   std::atomic_int count{0};
235   // prepare a unique closure for each depth
236   for (int i = 0; i <= params.depth; i++) {
237     // call the previous closure (e.g., closures[2] calls closures[1] during
238     // fanout)
239     closures.push_back(new AnyInvocableClosure(
240         [i, engine, &closures, params, signal_holder = &signal, &count]() {
241           ClosureFanOutCallback(closures[i], engine, signal_holder, count,
242                                 params);
243         }));
244   }
245   for (auto _ : state) {
246     GPR_DEBUG_ASSERT(count.load(std::memory_order_relaxed) == 0);
247     engine->Run(closures[params.depth + 1]);
248     do {
249       signal->WaitForNotification();
250     } while (count.load() != params.limit);
251     // cleanup
252     state.PauseTiming();
253     delete signal;
254     signal = new grpc_core::Notification();
255     count.store(0);
256     state.ResumeTiming();
257   }
258   delete signal;
259   state.SetItemsProcessed(params.limit * state.iterations());
260   for (auto i : closures) delete i;
261 }
262 BENCHMARK(BM_EventEngine_Closure_FanOut)->Apply(FanoutTestArguments);
263 
264 }  // namespace
265 
266 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
267 // and others do not. This allows us to support both modes.
268 namespace benchmark {
RunTheBenchmarksNamespaced()269 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
270 }  // namespace benchmark
271 
main(int argc,char ** argv)272 int main(int argc, char** argv) {
273   grpc::testing::TestEnvironment env(&argc, argv);
274   LibraryInitializer libInit;
275   benchmark::Initialize(&argc, argv);
276   grpc::testing::InitTest(&argc, &argv, false);
277 
278   benchmark::RunTheBenchmarksNamespaced();
279   return 0;
280 }
281