1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // This benchmark exists to ensure that the benchmark integration is
20 // working
21
22 #include <benchmark/benchmark.h>
23
24 #include <grpc/grpc.h>
25 #include <grpc/support/log.h>
26 #include <grpcpp/completion_queue.h>
27 #include <grpcpp/impl/grpc_library.h>
28
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 #include "src/core/lib/surface/completion_queue.h"
32 #include "test/core/util/test_config.h"
33 #include "test/cpp/microbenchmarks/helpers.h"
34 #include "test/cpp/util/test_config.h"
35
36 namespace grpc {
37 namespace testing {
38
BM_CreateDestroyCpp(benchmark::State & state)39 static void BM_CreateDestroyCpp(benchmark::State& state) {
40 for (auto _ : state) {
41 CompletionQueue cq;
42 }
43 }
44 BENCHMARK(BM_CreateDestroyCpp);
45
46 // Create cq using a different constructor
BM_CreateDestroyCpp2(benchmark::State & state)47 static void BM_CreateDestroyCpp2(benchmark::State& state) {
48 for (auto _ : state) {
49 grpc_completion_queue* core_cq =
50 grpc_completion_queue_create_for_next(nullptr);
51 CompletionQueue cq(core_cq);
52 }
53 }
54 BENCHMARK(BM_CreateDestroyCpp2);
55
BM_CreateDestroyCore(benchmark::State & state)56 static void BM_CreateDestroyCore(benchmark::State& state) {
57 for (auto _ : state) {
58 // TODO(sreek): Templatize this benchmark and pass completion type and
59 // polling type as parameters
60 grpc_completion_queue_destroy(
61 grpc_completion_queue_create_for_next(nullptr));
62 }
63 }
64 BENCHMARK(BM_CreateDestroyCore);
65
DoneWithCompletionOnStack(void *,grpc_cq_completion *)66 static void DoneWithCompletionOnStack(void* /*arg*/,
67 grpc_cq_completion* /*completion*/) {}
68
DoneWithCompletionOnHeap(void *,grpc_cq_completion * completion)69 static void DoneWithCompletionOnHeap(void* /*arg*/,
70 grpc_cq_completion* completion) {
71 delete completion;
72 }
73
74 class PhonyTag final : public internal::CompletionQueueTag {
75 public:
FinalizeResult(void **,bool *)76 bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
77 return true;
78 }
79 };
80
BM_Pass1Cpp(benchmark::State & state)81 static void BM_Pass1Cpp(benchmark::State& state) {
82 CompletionQueue cq;
83 grpc_completion_queue* c_cq = cq.cq();
84 for (auto _ : state) {
85 grpc_cq_completion completion;
86 PhonyTag phony_tag;
87 grpc_core::ExecCtx exec_ctx;
88 GPR_ASSERT(grpc_cq_begin_op(c_cq, &phony_tag));
89 grpc_cq_end_op(c_cq, &phony_tag, absl::OkStatus(),
90 DoneWithCompletionOnStack, nullptr, &completion);
91
92 void* tag;
93 bool ok;
94 cq.Next(&tag, &ok);
95 }
96 }
97 BENCHMARK(BM_Pass1Cpp);
98
BM_Pass1Core(benchmark::State & state)99 static void BM_Pass1Core(benchmark::State& state) {
100 // TODO(sreek): Templatize this benchmark and pass polling_type as a param
101 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
102 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
103 for (auto _ : state) {
104 grpc_cq_completion completion;
105 grpc_core::ExecCtx exec_ctx;
106 GPR_ASSERT(grpc_cq_begin_op(cq, nullptr));
107 grpc_cq_end_op(cq, nullptr, absl::OkStatus(), DoneWithCompletionOnStack,
108 nullptr, &completion);
109
110 grpc_completion_queue_next(cq, deadline, nullptr);
111 }
112 grpc_completion_queue_destroy(cq);
113 }
114 BENCHMARK(BM_Pass1Core);
115
BM_Pluck1Core(benchmark::State & state)116 static void BM_Pluck1Core(benchmark::State& state) {
117 // TODO(sreek): Templatize this benchmark and pass polling_type as a param
118 grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(nullptr);
119 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
120 for (auto _ : state) {
121 grpc_cq_completion completion;
122 grpc_core::ExecCtx exec_ctx;
123 GPR_ASSERT(grpc_cq_begin_op(cq, nullptr));
124 grpc_cq_end_op(cq, nullptr, absl::OkStatus(), DoneWithCompletionOnStack,
125 nullptr, &completion);
126
127 grpc_completion_queue_pluck(cq, nullptr, deadline, nullptr);
128 }
129 grpc_completion_queue_destroy(cq);
130 }
131 BENCHMARK(BM_Pluck1Core);
132
BM_EmptyCore(benchmark::State & state)133 static void BM_EmptyCore(benchmark::State& state) {
134 // TODO(sreek): Templatize this benchmark and pass polling_type as a param
135 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
136 gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
137 for (auto _ : state) {
138 grpc_completion_queue_next(cq, deadline, nullptr);
139 }
140 grpc_completion_queue_destroy(cq);
141 }
142 BENCHMARK(BM_EmptyCore);
143
144 // Helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)145 static void shutdown_and_destroy(grpc_completion_queue* cc) {
146 grpc_completion_queue_shutdown(cc);
147 grpc_completion_queue_destroy(cc);
148 }
149
150 static gpr_mu shutdown_mu, mu;
151 static gpr_cv shutdown_cv, cv;
152
153 // Tag completion queue iterate times
154 class TagCallback : public grpc_completion_queue_functor {
155 public:
TagCallback(int * iter)156 explicit TagCallback(int* iter) : iter_(iter) {
157 functor_run = &TagCallback::Run;
158 inlineable = false;
159 }
~TagCallback()160 ~TagCallback() {}
Run(grpc_completion_queue_functor * cb,int ok)161 static void Run(grpc_completion_queue_functor* cb, int ok) {
162 gpr_mu_lock(&mu);
163 GPR_ASSERT(static_cast<bool>(ok));
164 *static_cast<TagCallback*>(cb)->iter_ += 1;
165 gpr_cv_signal(&cv);
166 gpr_mu_unlock(&mu);
167 };
168
169 private:
170 int* iter_;
171 };
172
173 // Check if completion queue is shut down
174 class ShutdownCallback : public grpc_completion_queue_functor {
175 public:
ShutdownCallback(bool * done)176 explicit ShutdownCallback(bool* done) : done_(done) {
177 functor_run = &ShutdownCallback::Run;
178 inlineable = false;
179 }
~ShutdownCallback()180 ~ShutdownCallback() {}
Run(grpc_completion_queue_functor * cb,int ok)181 static void Run(grpc_completion_queue_functor* cb, int ok) {
182 gpr_mu_lock(&shutdown_mu);
183 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
184 gpr_cv_signal(&shutdown_cv);
185 gpr_mu_unlock(&shutdown_mu);
186 }
187
188 private:
189 bool* done_;
190 };
191
BM_Callback_CQ_Pass1Core(benchmark::State & state)192 static void BM_Callback_CQ_Pass1Core(benchmark::State& state) {
193 int iteration = 0, current_iterations = 0;
194 TagCallback tag_cb(&iteration);
195 gpr_mu_init(&mu);
196 gpr_cv_init(&cv);
197 gpr_mu_init(&shutdown_mu);
198 gpr_cv_init(&shutdown_cv);
199 bool got_shutdown = false;
200 ShutdownCallback shutdown_cb(&got_shutdown);
201 // This test with stack-allocated completions only works for non-polling or
202 // EM-polling callback core CQs because otherwise the callback could execute
203 // on another thread after the stack objects here go out of scope. An
204 // alternative would be to synchronize between the benchmark loop and the
205 // callback, but then it would be measuring the overhead of synchronization
206 // rather than the overhead of the completion queue.
207 // For generality, test here with non-polling.
208 grpc_completion_queue_attributes attr;
209 attr.version = 2;
210 attr.cq_completion_type = GRPC_CQ_CALLBACK;
211 attr.cq_polling_type = GRPC_CQ_NON_POLLING;
212 attr.cq_shutdown_cb = &shutdown_cb;
213 grpc_completion_queue* cc = grpc_completion_queue_create(
214 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
215 for (auto _ : state) {
216 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
217 grpc_core::ExecCtx exec_ctx;
218 grpc_cq_completion completion;
219 GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
220 grpc_cq_end_op(cc, &tag_cb, absl::OkStatus(), DoneWithCompletionOnStack,
221 nullptr, &completion);
222 }
223 shutdown_and_destroy(cc);
224
225 gpr_mu_lock(&mu);
226 current_iterations = static_cast<int>(state.iterations());
227 while (current_iterations != iteration) {
228 // Wait for all the callbacks to complete.
229 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
230 }
231 gpr_mu_unlock(&mu);
232
233 gpr_mu_lock(&shutdown_mu);
234 while (!got_shutdown) {
235 // Wait for the shutdown callback to complete.
236 gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
237 }
238 gpr_mu_unlock(&shutdown_mu);
239
240 GPR_ASSERT(got_shutdown);
241 GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
242 gpr_cv_destroy(&cv);
243 gpr_mu_destroy(&mu);
244 gpr_cv_destroy(&shutdown_cv);
245 gpr_mu_destroy(&shutdown_mu);
246 }
BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State & state)247 static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) {
248 int iteration = 0, current_iterations = 0;
249 TagCallback tag_cb(&iteration);
250 gpr_mu_init(&mu);
251 gpr_cv_init(&cv);
252 gpr_mu_init(&shutdown_mu);
253 gpr_cv_init(&shutdown_cv);
254 bool got_shutdown = false;
255 ShutdownCallback shutdown_cb(&got_shutdown);
256 grpc_completion_queue* cc =
257 grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr);
258 for (auto _ : state) {
259 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
260 grpc_core::ExecCtx exec_ctx;
261 grpc_cq_completion* completion = new grpc_cq_completion;
262 GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb));
263 grpc_cq_end_op(cc, &tag_cb, absl::OkStatus(), DoneWithCompletionOnHeap,
264 nullptr, completion);
265 }
266 shutdown_and_destroy(cc);
267
268 gpr_mu_lock(&mu);
269 current_iterations = static_cast<int>(state.iterations());
270 while (current_iterations != iteration) {
271 // Wait for all the callbacks to complete.
272 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
273 }
274 gpr_mu_unlock(&mu);
275
276 gpr_mu_lock(&shutdown_mu);
277 while (!got_shutdown) {
278 // Wait for the shutdown callback to complete.
279 gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
280 }
281 gpr_mu_unlock(&shutdown_mu);
282
283 GPR_ASSERT(got_shutdown);
284 GPR_ASSERT(iteration == static_cast<int>(state.iterations()));
285 gpr_cv_destroy(&cv);
286 gpr_mu_destroy(&mu);
287 gpr_cv_destroy(&shutdown_cv);
288 gpr_mu_destroy(&shutdown_mu);
289 }
290 BENCHMARK(BM_Callback_CQ_Pass1Core);
291 BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion);
292
293 } // namespace testing
294 } // namespace grpc
295
296 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
297 // and others do not. This allows us to support both modes.
298 namespace benchmark {
RunTheBenchmarksNamespaced()299 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
300 } // namespace benchmark
301
main(int argc,char ** argv)302 int main(int argc, char** argv) {
303 grpc::testing::TestEnvironment env(&argc, argv);
304 LibraryInitializer libInit;
305 ::benchmark::Initialize(&argc, argv);
306 grpc::testing::InitTest(&argc, &argv, false);
307 benchmark::RunTheBenchmarksNamespaced();
308 return 0;
309 }
310