1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/memory/memory.h"
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/cpu.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/usage_timer.h"
41
42 namespace grpc {
43 namespace testing {
44
45 ///
46 /// Maintains context info per RPC
47 ///
48 struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext49 explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
50 : alarm_(nullptr), stub_(stub) {}
51
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext52 ~CallbackClientRpcContext() {}
53
54 SimpleResponse response_;
55 ClientContext context_;
56 std::unique_ptr<Alarm> alarm_;
57 BenchmarkService::Stub* stub_;
58 };
59
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)60 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
61 const std::shared_ptr<Channel>& ch) {
62 return BenchmarkService::NewStub(ch);
63 }
64
65 class CallbackClient
66 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
67 public:
CallbackClient(const ClientConfig & config)68 explicit CallbackClient(const ClientConfig& config)
69 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
70 config, BenchmarkStubCreator) {
71 num_threads_ = NumThreads(config);
72 rpcs_done_ = 0;
73
74 // Don't divide the fixed load among threads as the user threads
75 // only bootstrap the RPCs
76 SetupLoadTest(config, 1);
77 total_outstanding_rpcs_ =
78 config.client_channels() * config.outstanding_rpcs_per_channel();
79 }
80
~CallbackClient()81 ~CallbackClient() override {}
82
83 ///
84 /// The main thread of the benchmark will be waiting on DestroyMultithreading.
85 /// Increment the rpcs_done_ variable to signify that the Callback RPC
86 /// after thread completion is done. When the last outstanding rpc increments
87 /// the counter it should also signal the main thread's conditional variable.
88 ///
NotifyMainThreadOfThreadCompletion()89 void NotifyMainThreadOfThreadCompletion() {
90 std::lock_guard<std::mutex> l(shutdown_mu_);
91 rpcs_done_++;
92 if (rpcs_done_ == total_outstanding_rpcs_) {
93 shutdown_cv_.notify_one();
94 }
95 }
96
NextRPCIssueTime()97 gpr_timespec NextRPCIssueTime() {
98 std::lock_guard<std::mutex> l(next_issue_time_mu_);
99 return Client::NextIssueTime(0);
100 }
101
102 protected:
103 size_t num_threads_;
104 size_t total_outstanding_rpcs_;
105 // The below mutex and condition variable is used by main benchmark thread to
106 // wait on completion of all RPCs before shutdown
107 std::mutex shutdown_mu_;
108 std::condition_variable shutdown_cv_;
109 // Number of rpcs done after thread completion
110 size_t rpcs_done_;
111 // Vector of Context data pointers for running a RPC
112 std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
113
114 virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
115 virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
116
ThreadFunc(size_t thread_idx,Thread * t)117 void ThreadFunc(size_t thread_idx, Thread* t) override {
118 InitThreadFuncImpl(thread_idx);
119 ThreadFuncImpl(t, thread_idx);
120 }
121
122 private:
123 std::mutex next_issue_time_mu_; // Used by next issue time
124
NumThreads(const ClientConfig & config)125 int NumThreads(const ClientConfig& config) {
126 int num_threads = config.async_client_threads();
127 if (num_threads <= 0) { // Use dynamic sizing
128 num_threads = cores_;
129 gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
130 }
131 return num_threads;
132 }
133
134 ///
135 /// Wait until all outstanding Callback RPCs are done
136 ///
DestroyMultithreading()137 void DestroyMultithreading() final {
138 std::unique_lock<std::mutex> l(shutdown_mu_);
139 while (rpcs_done_ != total_outstanding_rpcs_) {
140 shutdown_cv_.wait(l);
141 }
142 EndThreads();
143 }
144 };
145
146 class CallbackUnaryClient final : public CallbackClient {
147 public:
CallbackUnaryClient(const ClientConfig & config)148 explicit CallbackUnaryClient(const ClientConfig& config)
149 : CallbackClient(config) {
150 for (int ch = 0; ch < config.client_channels(); ch++) {
151 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
152 ctx_.emplace_back(
153 new CallbackClientRpcContext(channels_[ch].get_stub()));
154 }
155 }
156 StartThreads(num_threads_);
157 }
~CallbackUnaryClient()158 ~CallbackUnaryClient() override {}
159
160 protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)161 bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
162 for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
163 vector_idx += num_threads_) {
164 ScheduleRpc(t, vector_idx);
165 }
166 return true;
167 }
168
InitThreadFuncImpl(size_t)169 void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
170
171 private:
ScheduleRpc(Thread * t,size_t vector_idx)172 void ScheduleRpc(Thread* t, size_t vector_idx) {
173 if (!closed_loop_) {
174 gpr_timespec next_issue_time = NextRPCIssueTime();
175 // Start an alarm callback to run the internal callback after
176 // next_issue_time
177 if (ctx_[vector_idx]->alarm_ == nullptr) {
178 ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
179 }
180 ctx_[vector_idx]->alarm_->Set(next_issue_time,
181 [this, t, vector_idx](bool /*ok*/) {
182 IssueUnaryCallbackRpc(t, vector_idx);
183 });
184 } else {
185 IssueUnaryCallbackRpc(t, vector_idx);
186 }
187 }
188
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)189 void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
190 double start = UsageTimer::Now();
191 ctx_[vector_idx]->stub_->async()->UnaryCall(
192 (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
193 [this, t, start, vector_idx](grpc::Status s) {
194 // Update Histogram with data from the callback run
195 HistogramEntry entry;
196 if (s.ok()) {
197 entry.set_value((UsageTimer::Now() - start) * 1e9);
198 }
199 entry.set_status(s.error_code());
200 t->UpdateHistogram(&entry);
201
202 if (ThreadCompleted() || !s.ok()) {
203 // Notify thread of completion
204 NotifyMainThreadOfThreadCompletion();
205 } else {
206 // Reallocate ctx for next RPC
207 ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
208 ctx_[vector_idx]->stub_);
209 // Schedule a new RPC
210 ScheduleRpc(t, vector_idx);
211 }
212 });
213 }
214 };
215
216 class CallbackStreamingClient : public CallbackClient {
217 public:
CallbackStreamingClient(const ClientConfig & config)218 explicit CallbackStreamingClient(const ClientConfig& config)
219 : CallbackClient(config),
220 messages_per_stream_(config.messages_per_stream()) {
221 for (int ch = 0; ch < config.client_channels(); ch++) {
222 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
223 ctx_.emplace_back(
224 new CallbackClientRpcContext(channels_[ch].get_stub()));
225 }
226 }
227 StartThreads(num_threads_);
228 }
~CallbackStreamingClient()229 ~CallbackStreamingClient() override {}
230
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)231 void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
232 // Update Histogram with data from the callback run
233 HistogramEntry entry;
234 if (ok) {
235 entry.set_value((UsageTimer::Now() - start) * 1e9);
236 }
237 thread_ptr->UpdateHistogram(&entry);
238 }
239
messages_per_stream()240 int messages_per_stream() { return messages_per_stream_; }
241
242 protected:
243 const int messages_per_stream_;
244 };
245
246 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
247 public:
CallbackStreamingPingPongClient(const ClientConfig & config)248 explicit CallbackStreamingPingPongClient(const ClientConfig& config)
249 : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()250 ~CallbackStreamingPingPongClient() override {}
251 };
252
253 class CallbackStreamingPingPongReactor final
254 : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
255 public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)256 CallbackStreamingPingPongReactor(
257 CallbackStreamingPingPongClient* client,
258 std::unique_ptr<CallbackClientRpcContext> ctx)
259 : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
260
StartNewRpc()261 void StartNewRpc() {
262 ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
263 write_time_ = UsageTimer::Now();
264 StartWrite(client_->request());
265 writes_done_started_.clear();
266 StartCall();
267 }
268
OnWriteDone(bool ok)269 void OnWriteDone(bool ok) override {
270 if (!ok) {
271 gpr_log(GPR_ERROR, "Error writing RPC");
272 }
273 if ((!ok || client_->ThreadCompleted()) &&
274 !writes_done_started_.test_and_set()) {
275 StartWritesDone();
276 }
277 StartRead(&ctx_->response_);
278 }
279
OnReadDone(bool ok)280 void OnReadDone(bool ok) override {
281 client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
282
283 if (client_->ThreadCompleted() || !ok ||
284 (client_->messages_per_stream() != 0 &&
285 ++messages_issued_ >= client_->messages_per_stream())) {
286 if (!ok) {
287 gpr_log(GPR_ERROR, "Error reading RPC");
288 }
289 if (!writes_done_started_.test_and_set()) {
290 StartWritesDone();
291 }
292 return;
293 }
294 if (!client_->IsClosedLoop()) {
295 gpr_timespec next_issue_time = client_->NextRPCIssueTime();
296 // Start an alarm callback to run the internal callback after
297 // next_issue_time
298 ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
299 write_time_ = UsageTimer::Now();
300 StartWrite(client_->request());
301 });
302 } else {
303 write_time_ = UsageTimer::Now();
304 StartWrite(client_->request());
305 }
306 }
307
OnDone(const Status & s)308 void OnDone(const Status& s) override {
309 if (client_->ThreadCompleted() || !s.ok()) {
310 client_->NotifyMainThreadOfThreadCompletion();
311 return;
312 }
313 ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
314 ScheduleRpc();
315 }
316
ScheduleRpc()317 void ScheduleRpc() {
318 if (!client_->IsClosedLoop()) {
319 gpr_timespec next_issue_time = client_->NextRPCIssueTime();
320 // Start an alarm callback to run the internal callback after
321 // next_issue_time
322 if (ctx_->alarm_ == nullptr) {
323 ctx_->alarm_ = std::make_unique<Alarm>();
324 }
325 ctx_->alarm_->Set(next_issue_time,
326 [this](bool /*ok*/) { StartNewRpc(); });
327 } else {
328 StartNewRpc();
329 }
330 }
331
set_thread_ptr(Client::Thread * ptr)332 void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
333
334 CallbackStreamingPingPongClient* client_;
335 std::unique_ptr<CallbackClientRpcContext> ctx_;
336 std::atomic_flag writes_done_started_;
337 Client::Thread* thread_ptr_; // Needed to update histogram entries
338 double write_time_; // Track ping-pong round start time
339 int messages_issued_; // Messages issued by this stream
340 };
341
342 class CallbackStreamingPingPongClientImpl final
343 : public CallbackStreamingPingPongClient {
344 public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)345 explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
346 : CallbackStreamingPingPongClient(config) {
347 for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
348 reactor_.emplace_back(
349 new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
350 }
351 }
~CallbackStreamingPingPongClientImpl()352 ~CallbackStreamingPingPongClientImpl() override {}
353
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)354 bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
355 for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
356 vector_idx += num_threads_) {
357 reactor_[vector_idx]->set_thread_ptr(t);
358 reactor_[vector_idx]->ScheduleRpc();
359 }
360 return true;
361 }
362
InitThreadFuncImpl(size_t)363 void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
364
365 private:
366 std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
367 };
368
369 // TODO(mhaidry) : Implement Streaming from client, server and both ways
370
CreateCallbackClient(const ClientConfig & config)371 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
372 switch (config.rpc_type()) {
373 case UNARY:
374 return std::unique_ptr<Client>(new CallbackUnaryClient(config));
375 case STREAMING:
376 return std::unique_ptr<Client>(
377 new CallbackStreamingPingPongClientImpl(config));
378 case STREAMING_FROM_CLIENT:
379 case STREAMING_FROM_SERVER:
380 case STREAMING_BOTH_WAYS:
381 grpc_core::Crash(
382 "STREAMING_FROM_* scenarios are not supported by the callback "
383 "API");
384 default:
385 grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
386 }
387 }
388
389 } // namespace testing
390 } // namespace grpc
391