xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/bm_cq.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // This benchmark exists to ensure that the benchmark integration is
20 // working
21 
22 #include <benchmark/benchmark.h>
23 
24 #include <grpc/grpc.h>
25 #include <grpc/support/log.h>
26 #include <grpcpp/completion_queue.h>
27 #include <grpcpp/impl/grpc_library.h>
28 
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 #include "src/core/lib/surface/completion_queue.h"
32 #include "test/core/util/test_config.h"
33 #include "test/cpp/microbenchmarks/helpers.h"
34 #include "test/cpp/util/test_config.h"
35 
36 namespace grpc {
37 namespace testing {
38 
BM_CreateDestroyCpp(benchmark::State & state)39 static void BM_CreateDestroyCpp(benchmark::State& state) {
40   for (auto _ : state) {
41     CompletionQueue cq;
42   }
43 }
44 BENCHMARK(BM_CreateDestroyCpp);
45 
46 // Create cq using a different constructor
BM_CreateDestroyCpp2(benchmark::State & state)47 static void BM_CreateDestroyCpp2(benchmark::State& state) {
48   for (auto _ : state) {
49     grpc_completion_queue* core_cq =
50         grpc_completion_queue_create_for_next(nullptr);
51     CompletionQueue cq(core_cq);
52   }
53 }
54 BENCHMARK(BM_CreateDestroyCpp2);
55 
BM_CreateDestroyCore(benchmark::State & state)56 static void BM_CreateDestroyCore(benchmark::State& state) {
57   for (auto _ : state) {
58     // TODO(sreek): Templatize this benchmark and pass completion type and
59     // polling type as parameters
60     grpc_completion_queue_destroy(
61         grpc_completion_queue_create_for_next(nullptr));
62   }
63 }
64 BENCHMARK(BM_CreateDestroyCore);
65 
DoneWithCompletionOnStack(void *,grpc_cq_completion *)66 static void DoneWithCompletionOnStack(void* /*arg*/,
67                                       grpc_cq_completion* /*completion*/) {}
68 
DoneWithCompletionOnHeap(void *,grpc_cq_completion * completion)69 static void DoneWithCompletionOnHeap(void* /*arg*/,
70                                      grpc_cq_completion* completion) {
71   delete completion;
72 }
73 
74 class PhonyTag final : public internal::CompletionQueueTag {
75  public:
FinalizeResult(void **,bool *)76   bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
77     return true;
78   }
79 };
80 
BM_Pass1Cpp(benchmark::State & state)81 static void BM_Pass1Cpp(benchmark::State& state) {
82   CompletionQueue cq;
83   grpc_completion_queue* c_cq = cq.cq();
84   for (auto _ : state) {
85     grpc_cq_completion completion;
86     PhonyTag phony_tag;
87     grpc_core::ExecCtx exec_ctx;
88     GPR_ASSERT(grpc_cq_begin_op(c_cq, &phony_tag));
89     grpc_cq_end_op(c_cq, &phony_tag, absl::OkStatus(),
90                    DoneWithCompletionOnStack, nullptr, &completion);
91 
92     void* tag;
93     bool ok;
94     cq.Next(&tag, &ok);
95   }
96 }
97 BENCHMARK(BM_Pass1Cpp);
98 
BM_Pass1Core(benchmark::State & state)99 static void BM_Pass1Core(benchmark::State& state) {
100   // TODO(sreek): Templatize this benchmark and pass polling_type as a param
101   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
102   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
103   for (auto _ : state) {
104     grpc_cq_completion completion;
105     grpc_core::ExecCtx exec_ctx;
106     GPR_ASSERT(grpc_cq_begin_op(cq, nullptr));
107     grpc_cq_end_op(cq, nullptr, absl::OkStatus(), DoneWithCompletionOnStack,
108                    nullptr, &completion);
109 
110     grpc_completion_queue_next(cq, deadline, nullptr);
111   }
112   grpc_completion_queue_destroy(cq);
113 }
114 BENCHMARK(BM_Pass1Core);
115 
BM_Pluck1Core(benchmark::State & state)116 static void BM_Pluck1Core(benchmark::State& state) {
117   // TODO(sreek): Templatize this benchmark and pass polling_type as a param
118   grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(nullptr);
119   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
120   for (auto _ : state) {
121     grpc_cq_completion completion;
122     grpc_core::ExecCtx exec_ctx;
123     GPR_ASSERT(grpc_cq_begin_op(cq, nullptr));
124     grpc_cq_end_op(cq, nullptr, absl::OkStatus(), DoneWithCompletionOnStack,
125                    nullptr, &completion);
126 
127     grpc_completion_queue_pluck(cq, nullptr, deadline, nullptr);
128   }
129   grpc_completion_queue_destroy(cq);
130 }
131 BENCHMARK(BM_Pluck1Core);
132 
BM_EmptyCore(benchmark::State & state)133 static void BM_EmptyCore(benchmark::State& state) {
134   // TODO(sreek): Templatize this benchmark and pass polling_type as a param
135   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
136   gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
137   for (auto _ : state) {
138     grpc_completion_queue_next(cq, deadline, nullptr);
139   }
140   grpc_completion_queue_destroy(cq);
141 }
142 BENCHMARK(BM_EmptyCore);
143 
144 // Helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)145 static void shutdown_and_destroy(grpc_completion_queue* cc) {
146   grpc_completion_queue_shutdown(cc);
147   grpc_completion_queue_destroy(cc);
148 }
149 
150 static gpr_mu shutdown_mu, mu;
151 static gpr_cv shutdown_cv, cv;
152 
153 // Tag completion queue iterate times
154 class TagCallback : public grpc_completion_queue_functor {
155  public:
TagCallback(int * iter)156   explicit TagCallback(int* iter) : iter_(iter) {
157     functor_run = &TagCallback::Run;
158     inlineable = false;
159   }
~TagCallback()160   ~TagCallback() {}
Run(grpc_completion_queue_functor * cb,int ok)161   static void Run(grpc_completion_queue_functor* cb, int ok) {
162     gpr_mu_lock(&mu);
163     GPR_ASSERT(static_cast<bool>(ok));
164     *static_cast<TagCallback*>(cb)->iter_ += 1;
165     gpr_cv_signal(&cv);
166     gpr_mu_unlock(&mu);
167   };
168 
169  private:
170   int* iter_;
171 };
172 
173 // Check if completion queue is shut down
174 class ShutdownCallback : public grpc_completion_queue_functor {
175  public:
ShutdownCallback(bool * done)176   explicit ShutdownCallback(bool* done) : done_(done) {
177     functor_run = &ShutdownCallback::Run;
178     inlineable = false;
179   }
~ShutdownCallback()180   ~ShutdownCallback() {}
Run(grpc_completion_queue_functor * cb,int ok)181   static void Run(grpc_completion_queue_functor* cb, int ok) {
182     gpr_mu_lock(&shutdown_mu);
183     *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
184     gpr_cv_signal(&shutdown_cv);
185     gpr_mu_unlock(&shutdown_mu);
186   }
187 
188  private:
189   bool* done_;
190 };
191 
BM_Callback_CQ_Pass1Core(benchmark::State & state)192 static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
193   int iteration = 0, current_iterations = 0;
194   TagCallback tag_cb(&iteration);
195   gpr_mu_init(&mu);
196   gpr_cv_init(&cv);
197   gpr_mu_init(&shutdown_mu);
198   gpr_cv_init(&shutdown_cv);
199   bool got_shutdown = false;
200   ShutdownCallback shutdown_cb(&got_shutdown);
201   // This test with stack-allocated completions only works for non-polling or
202   // EM-polling callback core CQs because otherwise the callback could execute
203   // on  another thread after the stack objects here go out of scope. An
204   // alternative would be to synchronize between the benchmark loop and the
205   // callback, but then it would be measuring the overhead of synchronization
206   // rather than the overhead of the completion queue.
207   // For generality, test here with non-polling.
208   grpc_completion_queue_attributes attr;
209   attr.version = 2;
210   attr.cq_completion_type = GRPC_CQ_CALLBACK;
211   attr.cq_polling_type = GRPC_CQ_NON_POLLING;
212   attr.cq_shutdown_cb = &shutdown_cb;
213   grpc_completion_queue* cc = grpc_completion_queue_create(
214       grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
215   for (auto _ : state) {
216     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
217     grpc_core::ExecCtx exec_ctx;
218     grpc_cq_completion completion;
219     GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
220     grpc_cq_end_op(cc, &tag_cb, absl::OkStatus(), DoneWithCompletionOnStack,
221                    nullptr, &completion);
222   }
223   shutdown_and_destroy(cc);
224 
225   gpr_mu_lock(&mu);
226   current_iterations = static_cast<int>(state.iterations());
227   while (current_iterations != iteration) {
228     // Wait for all the callbacks to complete.
229     gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
230   }
231   gpr_mu_unlock(&mu);
232 
233   gpr_mu_lock(&shutdown_mu);
234   while (!got_shutdown) {
235     // Wait for the shutdown callback to complete.
236     gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
237   }
238   gpr_mu_unlock(&shutdown_mu);
239 
240   GPR_ASSERT(got_shutdown);
241   GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
242   gpr_cv_destroy(&cv);
243   gpr_mu_destroy(&mu);
244   gpr_cv_destroy(&shutdown_cv);
245   gpr_mu_destroy(&shutdown_mu);
246 }
BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State & state)247 static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) {
248   int iteration = 0, current_iterations = 0;
249   TagCallback tag_cb(&iteration);
250   gpr_mu_init(&mu);
251   gpr_cv_init(&cv);
252   gpr_mu_init(&shutdown_mu);
253   gpr_cv_init(&shutdown_cv);
254   bool got_shutdown = false;
255   ShutdownCallback shutdown_cb(&got_shutdown);
256   grpc_completion_queue* cc =
257       grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
258   for (auto _ : state) {
259     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
260     grpc_core::ExecCtx exec_ctx;
261     grpc_cq_completion* completion = new grpc_cq_completion;
262     GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
263     grpc_cq_end_op(cc, &tag_cb, absl::OkStatus(), DoneWithCompletionOnHeap,
264                    nullptr, completion);
265   }
266   shutdown_and_destroy(cc);
267 
268   gpr_mu_lock(&mu);
269   current_iterations = static_cast<int>(state.iterations());
270   while (current_iterations != iteration) {
271     // Wait for all the callbacks to complete.
272     gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
273   }
274   gpr_mu_unlock(&mu);
275 
276   gpr_mu_lock(&shutdown_mu);
277   while (!got_shutdown) {
278     // Wait for the shutdown callback to complete.
279     gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
280   }
281   gpr_mu_unlock(&shutdown_mu);
282 
283   GPR_ASSERT(got_shutdown);
284   GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
285   gpr_cv_destroy(&cv);
286   gpr_mu_destroy(&mu);
287   gpr_cv_destroy(&shutdown_cv);
288   gpr_mu_destroy(&shutdown_mu);
289 }
290 BENCHMARK(BM_Callback_CQ_Pass1Core);
291 BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion);
292 
293 }  // namespace testing
294 }  // namespace grpc
295 
296 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
297 // and others do not. This allows us to support both modes.
298 namespace benchmark {
RunTheBenchmarksNamespaced()299 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
300 }  // namespace benchmark
301 
main(int argc,char ** argv)302 int main(int argc, char** argv) {
303   grpc::testing::TestEnvironment env(&argc, argv);
304   LibraryInitializer libInit;
305   ::benchmark::Initialize(&argc, argv);
306   grpc::testing::InitTest(&argc, &argv, false);
307   benchmark::RunTheBenchmarksNamespaced();
308   return 0;
309 }
310