1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <string.h>
20
21 #include <atomic>
22
23 #include <benchmark/benchmark.h>
24
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/time.h"
31 #include "src/core/lib/iomgr/ev_posix.h"
32 #include "src/core/lib/iomgr/port.h"
33 #include "src/core/lib/surface/completion_queue.h"
34 #include "test/core/util/test_config.h"
35 #include "test/cpp/microbenchmarks/helpers.h"
36 #include "test/cpp/util/test_config.h"
37
38 struct grpc_pollset {
39 gpr_mu mu;
40 };
41
42 static gpr_mu g_mu;
43 static gpr_cv g_cv;
44 static int g_threads_active;
45 static bool g_active;
46
47 namespace grpc {
48 namespace testing {
49 static grpc_completion_queue* g_cq;
50
pollset_shutdown(grpc_pollset *,grpc_closure * closure)51 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
52 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
53 }
54
pollset_init(grpc_pollset * ps,gpr_mu ** mu)55 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
56 gpr_mu_init(&ps->mu);
57 *mu = &ps->mu;
58 }
59
pollset_destroy(grpc_pollset * ps)60 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
61
pollset_kick(grpc_pollset *,grpc_pollset_worker *)62 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
63 grpc_pollset_worker* /*worker*/) {
64 return absl::OkStatus();
65 }
66
67 // Callback when the tag is dequeued from the completion queue. Does nothing
cq_done_cb(void *,grpc_cq_completion * cq_completion)68 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
69 gpr_free(cq_completion);
70 }
71
72 // Queues a completion tag if deadline is > 0.
73 // Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC))
pollset_work(grpc_pollset * ps,grpc_pollset_worker **,grpc_core::Timestamp deadline)74 static grpc_error_handle pollset_work(grpc_pollset* ps,
75 grpc_pollset_worker** /*worker*/,
76 grpc_core::Timestamp deadline) {
77 if (deadline == grpc_core::Timestamp::ProcessEpoch()) {
78 gpr_log(GPR_DEBUG, "no-op");
79 return absl::OkStatus();
80 }
81
82 gpr_mu_unlock(&ps->mu);
83
84 void* tag = reinterpret_cast<void*>(10); // Some random number
85 GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
86 grpc_cq_end_op(
87 g_cq, tag, absl::OkStatus(), cq_done_cb, nullptr,
88 static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
89 grpc_core::ExecCtx::Get()->Flush();
90 gpr_mu_lock(&ps->mu);
91 return absl::OkStatus();
92 }
93
make_engine_vtable(const char * name)94 static grpc_event_engine_vtable make_engine_vtable(const char* name) {
95 grpc_event_engine_vtable vtable;
96 memset(&vtable, 0, sizeof(vtable));
97
98 vtable.pollset_size = sizeof(grpc_pollset);
99 vtable.pollset_init = pollset_init;
100 vtable.pollset_shutdown = pollset_shutdown;
101 vtable.pollset_destroy = pollset_destroy;
102 vtable.pollset_work = pollset_work;
103 vtable.pollset_kick = pollset_kick;
104 vtable.is_any_background_poller_thread = [] { return false; };
105 vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
106 grpc_error_handle /*error*/) {
107 return false;
108 };
109 vtable.shutdown_background_closure = [] {};
110 vtable.shutdown_engine = [] {};
111 vtable.check_engine_available = [](bool) { return true; };
112 vtable.init_engine = [] {};
113 vtable.name = name;
114
115 return vtable;
116 }
117
setup()118 static void setup() {
119 grpc_init();
120 GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
121 strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
122 0);
123
124 g_cq = grpc_completion_queue_create_for_next(nullptr);
125 }
126
teardown()127 static void teardown() {
128 grpc_completion_queue_shutdown(g_cq);
129
130 // Drain any events
131 gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
132 while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
133 GRPC_QUEUE_SHUTDOWN) {
134 // Do nothing
135 }
136
137 grpc_completion_queue_destroy(g_cq);
138 grpc_shutdown();
139 }
140
141 // A few notes about Multi-threaded benchmarks:
142
143 // Setup:
144 // The benchmark framework ensures that none of the threads proceed beyond the
145 // state.KeepRunning() call unless all the threads have called state.keepRunning
146 // at least once. So it is safe to do the initialization in one of the threads
147 // before state.KeepRunning() is called.
148
149 // Teardown:
150 // The benchmark framework also ensures that no thread is running the benchmark
151 // code (i.e the code between two successive calls of state.KeepRunning()) if
152 // state.KeepRunning() returns false. So it is safe to do the teardown in one
153 // of the threads after state.keepRunning() returns false.
154
155 // However, our use requires synchronization because we do additional work at
156 // each thread that requires specific ordering (TrackCounters must be
157 // constructed after grpc_init because it needs the number of cores, initialized
158 // by grpc, and its Finish call must take place before grpc_shutdown so that it
159 // can use grpc_stats).
160 //
BM_Cq_Throughput(benchmark::State & state)161 static void BM_Cq_Throughput(benchmark::State& state) {
162 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
163 auto thd_idx = state.thread_index();
164
165 gpr_mu_lock(&g_mu);
166 g_threads_active++;
167 if (thd_idx == 0) {
168 setup();
169 g_active = true;
170 gpr_cv_broadcast(&g_cv);
171 } else {
172 while (!g_active) {
173 gpr_cv_wait(&g_cv, &g_mu, deadline);
174 }
175 }
176 gpr_mu_unlock(&g_mu);
177
178 for (auto _ : state) {
179 GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
180 GRPC_OP_COMPLETE);
181 }
182
183 state.SetItemsProcessed(state.iterations());
184
185 gpr_mu_lock(&g_mu);
186 g_threads_active--;
187 if (g_threads_active == 0) {
188 gpr_cv_broadcast(&g_cv);
189 } else {
190 while (g_threads_active > 0) {
191 gpr_cv_wait(&g_cv, &g_mu, deadline);
192 }
193 }
194 gpr_mu_unlock(&g_mu);
195
196 if (thd_idx == 0) {
197 teardown();
198 g_active = false;
199 }
200 }
201
202 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
203
204 namespace {
205 const grpc_event_engine_vtable g_none_vtable =
206 grpc::testing::make_engine_vtable("none");
207 const grpc_event_engine_vtable g_bm_vtable =
208 grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
209 } // namespace
210
211 } // namespace testing
212 } // namespace grpc
213
214 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
215 // and others do not. This allows us to support both modes.
216 namespace benchmark {
RunTheBenchmarksNamespaced()217 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
218 } // namespace benchmark
219
main(int argc,char ** argv)220 int main(int argc, char** argv) {
221 // This test should only ever be run with a non or any polling engine
222 // Override the polling engine for the non-polling engine
223 // and add a custom polling engine
224 grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
225 grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
226 grpc::testing::TestEnvironment env(&argc, argv);
227 gpr_mu_init(&g_mu);
228 gpr_cv_init(&g_cv);
229 ::benchmark::Initialize(&argc, argv);
230 grpc::testing::InitTest(&argc, &argv, false);
231 benchmark::RunTheBenchmarksNamespaced();
232 return 0;
233 }
234