xref: /aosp_15_r20/external/grpc-grpc/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <string.h>
20 
21 #include <atomic>
22 
23 #include <benchmark/benchmark.h>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/gprpp/crash.h"
30 #include "src/core/lib/gprpp/time.h"
31 #include "src/core/lib/iomgr/ev_posix.h"
32 #include "src/core/lib/iomgr/port.h"
33 #include "src/core/lib/surface/completion_queue.h"
34 #include "test/core/util/test_config.h"
35 #include "test/cpp/microbenchmarks/helpers.h"
36 #include "test/cpp/util/test_config.h"
37 
38 struct grpc_pollset {
39   gpr_mu mu;
40 };
41 
42 static gpr_mu g_mu;
43 static gpr_cv g_cv;
44 static int g_threads_active;
45 static bool g_active;
46 
47 namespace grpc {
48 namespace testing {
49 static grpc_completion_queue* g_cq;
50 
pollset_shutdown(grpc_pollset *,grpc_closure * closure)51 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
52   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
53 }
54 
pollset_init(grpc_pollset * ps,gpr_mu ** mu)55 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
56   gpr_mu_init(&ps->mu);
57   *mu = &ps->mu;
58 }
59 
pollset_destroy(grpc_pollset * ps)60 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
61 
pollset_kick(grpc_pollset *,grpc_pollset_worker *)62 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
63                                       grpc_pollset_worker* /*worker*/) {
64   return absl::OkStatus();
65 }
66 
67 // Callback when the tag is dequeued from the completion queue. Does nothing
cq_done_cb(void *,grpc_cq_completion * cq_completion)68 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
69   gpr_free(cq_completion);
70 }
71 
72 // Queues a completion tag if deadline is > 0.
73 // Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC))
pollset_work(grpc_pollset * ps,grpc_pollset_worker **,grpc_core::Timestamp deadline)74 static grpc_error_handle pollset_work(grpc_pollset* ps,
75                                       grpc_pollset_worker** /*worker*/,
76                                       grpc_core::Timestamp deadline) {
77   if (deadline == grpc_core::Timestamp::ProcessEpoch()) {
78     gpr_log(GPR_DEBUG, "no-op");
79     return absl::OkStatus();
80   }
81 
82   gpr_mu_unlock(&ps->mu);
83 
84   void* tag = reinterpret_cast<void*>(10);  // Some random number
85   GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
86   grpc_cq_end_op(
87       g_cq, tag, absl::OkStatus(), cq_done_cb, nullptr,
88       static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
89   grpc_core::ExecCtx::Get()->Flush();
90   gpr_mu_lock(&ps->mu);
91   return absl::OkStatus();
92 }
93 
make_engine_vtable(const char * name)94 static grpc_event_engine_vtable make_engine_vtable(const char* name) {
95   grpc_event_engine_vtable vtable;
96   memset(&vtable, 0, sizeof(vtable));
97 
98   vtable.pollset_size = sizeof(grpc_pollset);
99   vtable.pollset_init = pollset_init;
100   vtable.pollset_shutdown = pollset_shutdown;
101   vtable.pollset_destroy = pollset_destroy;
102   vtable.pollset_work = pollset_work;
103   vtable.pollset_kick = pollset_kick;
104   vtable.is_any_background_poller_thread = [] { return false; };
105   vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
106                                                grpc_error_handle /*error*/) {
107     return false;
108   };
109   vtable.shutdown_background_closure = [] {};
110   vtable.shutdown_engine = [] {};
111   vtable.check_engine_available = [](bool) { return true; };
112   vtable.init_engine = [] {};
113   vtable.name = name;
114 
115   return vtable;
116 }
117 
setup()118 static void setup() {
119   grpc_init();
120   GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
121              strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
122                  0);
123 
124   g_cq = grpc_completion_queue_create_for_next(nullptr);
125 }
126 
teardown()127 static void teardown() {
128   grpc_completion_queue_shutdown(g_cq);
129 
130   // Drain any events
131   gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
132   while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
133          GRPC_QUEUE_SHUTDOWN) {
134     // Do nothing
135   }
136 
137   grpc_completion_queue_destroy(g_cq);
138   grpc_shutdown();
139 }
140 
141 // A few notes about Multi-threaded benchmarks:
142 
143 // Setup:
144 // The benchmark framework ensures that none of the threads proceed beyond the
145 // state.KeepRunning() call unless all the threads have called state.keepRunning
146 // at least once.  So it is safe to do the initialization in one of the threads
147 // before state.KeepRunning() is called.
148 
149 // Teardown:
150 // The benchmark framework also ensures that no thread is running the benchmark
151 // code (i.e the code between two successive calls of state.KeepRunning()) if
152 // state.KeepRunning() returns false. So it is safe to do the teardown in one
153 // of the threads after state.keepRunning() returns false.
154 
155 // However, our use requires synchronization because we do additional work at
156 // each thread that requires specific ordering (TrackCounters must be
157 // constructed after grpc_init because it needs the number of cores, initialized
158 // by grpc, and its Finish call must take place before grpc_shutdown so that it
159 // can use grpc_stats).
160 //
BM_Cq_Throughput(benchmark::State & state)161 static void BM_Cq_Throughput(benchmark::State& state) {
162   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
163   auto thd_idx = state.thread_index();
164 
165   gpr_mu_lock(&g_mu);
166   g_threads_active++;
167   if (thd_idx == 0) {
168     setup();
169     g_active = true;
170     gpr_cv_broadcast(&g_cv);
171   } else {
172     while (!g_active) {
173       gpr_cv_wait(&g_cv, &g_mu, deadline);
174     }
175   }
176   gpr_mu_unlock(&g_mu);
177 
178   for (auto _ : state) {
179     GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
180                GRPC_OP_COMPLETE);
181   }
182 
183   state.SetItemsProcessed(state.iterations());
184 
185   gpr_mu_lock(&g_mu);
186   g_threads_active--;
187   if (g_threads_active == 0) {
188     gpr_cv_broadcast(&g_cv);
189   } else {
190     while (g_threads_active > 0) {
191       gpr_cv_wait(&g_cv, &g_mu, deadline);
192     }
193   }
194   gpr_mu_unlock(&g_mu);
195 
196   if (thd_idx == 0) {
197     teardown();
198     g_active = false;
199   }
200 }
201 
202 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
203 
204 namespace {
205 const grpc_event_engine_vtable g_none_vtable =
206     grpc::testing::make_engine_vtable("none");
207 const grpc_event_engine_vtable g_bm_vtable =
208     grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
209 }  // namespace
210 
211 }  // namespace testing
212 }  // namespace grpc
213 
214 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
215 // and others do not. This allows us to support both modes.
216 namespace benchmark {
RunTheBenchmarksNamespaced()217 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
218 }  // namespace benchmark
219 
main(int argc,char ** argv)220 int main(int argc, char** argv) {
221   // This test should only ever be run with a non or any polling engine
222   // Override the polling engine for the non-polling engine
223   // and add a custom polling engine
224   grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
225   grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
226   grpc::testing::TestEnvironment env(&argc, argv);
227   gpr_mu_init(&g_mu);
228   gpr_cv_init(&g_cv);
229   ::benchmark::Initialize(&argc, argv);
230   grpc::testing::InitTest(&argc, &argv, false);
231   benchmark::RunTheBenchmarksNamespaced();
232   return 0;
233 }
234