xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/client_callback.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/memory/memory.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/cpu.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/usage_timer.h"
41 
42 namespace grpc {
43 namespace testing {
44 
45 ///
46 /// Maintains context info per RPC
47 ///
48 struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext49   explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
50       : alarm_(nullptr), stub_(stub) {}
51 
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext52   ~CallbackClientRpcContext() {}
53 
54   SimpleResponse response_;
55   ClientContext context_;
56   std::unique_ptr<Alarm> alarm_;
57   BenchmarkService::Stub* stub_;
58 };
59 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)60 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
61     const std::shared_ptr<Channel>& ch) {
62   return BenchmarkService::NewStub(ch);
63 }
64 
65 class CallbackClient
66     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
67  public:
CallbackClient(const ClientConfig & config)68   explicit CallbackClient(const ClientConfig& config)
69       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
70             config, BenchmarkStubCreator) {
71     num_threads_ = NumThreads(config);
72     rpcs_done_ = 0;
73 
74     //  Don't divide the fixed load among threads as the user threads
75     //  only bootstrap the RPCs
76     SetupLoadTest(config, 1);
77     total_outstanding_rpcs_ =
78         config.client_channels() * config.outstanding_rpcs_per_channel();
79   }
80 
~CallbackClient()81   ~CallbackClient() override {}
82 
83   ///
84   /// The main thread of the benchmark will be waiting on DestroyMultithreading.
85   /// Increment the rpcs_done_ variable to signify that the Callback RPC
86   /// after thread completion is done. When the last outstanding rpc increments
87   /// the counter it should also signal the main thread's conditional variable.
88   ///
NotifyMainThreadOfThreadCompletion()89   void NotifyMainThreadOfThreadCompletion() {
90     std::lock_guard<std::mutex> l(shutdown_mu_);
91     rpcs_done_++;
92     if (rpcs_done_ == total_outstanding_rpcs_) {
93       shutdown_cv_.notify_one();
94     }
95   }
96 
NextRPCIssueTime()97   gpr_timespec NextRPCIssueTime() {
98     std::lock_guard<std::mutex> l(next_issue_time_mu_);
99     return Client::NextIssueTime(0);
100   }
101 
102  protected:
103   size_t num_threads_;
104   size_t total_outstanding_rpcs_;
105   // The below mutex and condition variable is used by main benchmark thread to
106   // wait on completion of all RPCs before shutdown
107   std::mutex shutdown_mu_;
108   std::condition_variable shutdown_cv_;
109   // Number of rpcs done after thread completion
110   size_t rpcs_done_;
111   // Vector of Context data pointers for running a RPC
112   std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
113 
114   virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
115   virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
116 
ThreadFunc(size_t thread_idx,Thread * t)117   void ThreadFunc(size_t thread_idx, Thread* t) override {
118     InitThreadFuncImpl(thread_idx);
119     ThreadFuncImpl(t, thread_idx);
120   }
121 
122  private:
123   std::mutex next_issue_time_mu_;  // Used by next issue time
124 
NumThreads(const ClientConfig & config)125   int NumThreads(const ClientConfig& config) {
126     int num_threads = config.async_client_threads();
127     if (num_threads <= 0) {  // Use dynamic sizing
128       num_threads = cores_;
129       gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
130     }
131     return num_threads;
132   }
133 
134   ///
135   /// Wait until all outstanding Callback RPCs are done
136   ///
DestroyMultithreading()137   void DestroyMultithreading() final {
138     std::unique_lock<std::mutex> l(shutdown_mu_);
139     while (rpcs_done_ != total_outstanding_rpcs_) {
140       shutdown_cv_.wait(l);
141     }
142     EndThreads();
143   }
144 };
145 
146 class CallbackUnaryClient final : public CallbackClient {
147  public:
CallbackUnaryClient(const ClientConfig & config)148   explicit CallbackUnaryClient(const ClientConfig& config)
149       : CallbackClient(config) {
150     for (int ch = 0; ch < config.client_channels(); ch++) {
151       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
152         ctx_.emplace_back(
153             new CallbackClientRpcContext(channels_[ch].get_stub()));
154       }
155     }
156     StartThreads(num_threads_);
157   }
~CallbackUnaryClient()158   ~CallbackUnaryClient() override {}
159 
160  protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)161   bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
162     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
163          vector_idx += num_threads_) {
164       ScheduleRpc(t, vector_idx);
165     }
166     return true;
167   }
168 
InitThreadFuncImpl(size_t)169   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
170 
171  private:
ScheduleRpc(Thread * t,size_t vector_idx)172   void ScheduleRpc(Thread* t, size_t vector_idx) {
173     if (!closed_loop_) {
174       gpr_timespec next_issue_time = NextRPCIssueTime();
175       // Start an alarm callback to run the internal callback after
176       // next_issue_time
177       if (ctx_[vector_idx]->alarm_ == nullptr) {
178         ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
179       }
180       ctx_[vector_idx]->alarm_->Set(next_issue_time,
181                                     [this, t, vector_idx](bool /*ok*/) {
182                                       IssueUnaryCallbackRpc(t, vector_idx);
183                                     });
184     } else {
185       IssueUnaryCallbackRpc(t, vector_idx);
186     }
187   }
188 
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)189   void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
190     double start = UsageTimer::Now();
191     ctx_[vector_idx]->stub_->async()->UnaryCall(
192         (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
193         [this, t, start, vector_idx](grpc::Status s) {
194           // Update Histogram with data from the callback run
195           HistogramEntry entry;
196           if (s.ok()) {
197             entry.set_value((UsageTimer::Now() - start) * 1e9);
198           }
199           entry.set_status(s.error_code());
200           t->UpdateHistogram(&entry);
201 
202           if (ThreadCompleted() || !s.ok()) {
203             // Notify thread of completion
204             NotifyMainThreadOfThreadCompletion();
205           } else {
206             // Reallocate ctx for next RPC
207             ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
208                 ctx_[vector_idx]->stub_);
209             // Schedule a new RPC
210             ScheduleRpc(t, vector_idx);
211           }
212         });
213   }
214 };
215 
216 class CallbackStreamingClient : public CallbackClient {
217  public:
CallbackStreamingClient(const ClientConfig & config)218   explicit CallbackStreamingClient(const ClientConfig& config)
219       : CallbackClient(config),
220         messages_per_stream_(config.messages_per_stream()) {
221     for (int ch = 0; ch < config.client_channels(); ch++) {
222       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
223         ctx_.emplace_back(
224             new CallbackClientRpcContext(channels_[ch].get_stub()));
225       }
226     }
227     StartThreads(num_threads_);
228   }
~CallbackStreamingClient()229   ~CallbackStreamingClient() override {}
230 
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)231   void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
232     // Update Histogram with data from the callback run
233     HistogramEntry entry;
234     if (ok) {
235       entry.set_value((UsageTimer::Now() - start) * 1e9);
236     }
237     thread_ptr->UpdateHistogram(&entry);
238   }
239 
messages_per_stream()240   int messages_per_stream() { return messages_per_stream_; }
241 
242  protected:
243   const int messages_per_stream_;
244 };
245 
246 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
247  public:
CallbackStreamingPingPongClient(const ClientConfig & config)248   explicit CallbackStreamingPingPongClient(const ClientConfig& config)
249       : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()250   ~CallbackStreamingPingPongClient() override {}
251 };
252 
253 class CallbackStreamingPingPongReactor final
254     : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
255  public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)256   CallbackStreamingPingPongReactor(
257       CallbackStreamingPingPongClient* client,
258       std::unique_ptr<CallbackClientRpcContext> ctx)
259       : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
260 
StartNewRpc()261   void StartNewRpc() {
262     ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
263     write_time_ = UsageTimer::Now();
264     StartWrite(client_->request());
265     writes_done_started_.clear();
266     StartCall();
267   }
268 
OnWriteDone(bool ok)269   void OnWriteDone(bool ok) override {
270     if (!ok) {
271       gpr_log(GPR_ERROR, "Error writing RPC");
272     }
273     if ((!ok || client_->ThreadCompleted()) &&
274         !writes_done_started_.test_and_set()) {
275       StartWritesDone();
276     }
277     StartRead(&ctx_->response_);
278   }
279 
OnReadDone(bool ok)280   void OnReadDone(bool ok) override {
281     client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
282 
283     if (client_->ThreadCompleted() || !ok ||
284         (client_->messages_per_stream() != 0 &&
285          ++messages_issued_ >= client_->messages_per_stream())) {
286       if (!ok) {
287         gpr_log(GPR_ERROR, "Error reading RPC");
288       }
289       if (!writes_done_started_.test_and_set()) {
290         StartWritesDone();
291       }
292       return;
293     }
294     if (!client_->IsClosedLoop()) {
295       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
296       // Start an alarm callback to run the internal callback after
297       // next_issue_time
298       ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
299         write_time_ = UsageTimer::Now();
300         StartWrite(client_->request());
301       });
302     } else {
303       write_time_ = UsageTimer::Now();
304       StartWrite(client_->request());
305     }
306   }
307 
OnDone(const Status & s)308   void OnDone(const Status& s) override {
309     if (client_->ThreadCompleted() || !s.ok()) {
310       client_->NotifyMainThreadOfThreadCompletion();
311       return;
312     }
313     ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
314     ScheduleRpc();
315   }
316 
ScheduleRpc()317   void ScheduleRpc() {
318     if (!client_->IsClosedLoop()) {
319       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
320       // Start an alarm callback to run the internal callback after
321       // next_issue_time
322       if (ctx_->alarm_ == nullptr) {
323         ctx_->alarm_ = std::make_unique<Alarm>();
324       }
325       ctx_->alarm_->Set(next_issue_time,
326                         [this](bool /*ok*/) { StartNewRpc(); });
327     } else {
328       StartNewRpc();
329     }
330   }
331 
set_thread_ptr(Client::Thread * ptr)332   void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
333 
334   CallbackStreamingPingPongClient* client_;
335   std::unique_ptr<CallbackClientRpcContext> ctx_;
336   std::atomic_flag writes_done_started_;
337   Client::Thread* thread_ptr_;  // Needed to update histogram entries
338   double write_time_;           // Track ping-pong round start time
339   int messages_issued_;         // Messages issued by this stream
340 };
341 
342 class CallbackStreamingPingPongClientImpl final
343     : public CallbackStreamingPingPongClient {
344  public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)345   explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
346       : CallbackStreamingPingPongClient(config) {
347     for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
348       reactor_.emplace_back(
349           new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
350     }
351   }
~CallbackStreamingPingPongClientImpl()352   ~CallbackStreamingPingPongClientImpl() override {}
353 
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)354   bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
355     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
356          vector_idx += num_threads_) {
357       reactor_[vector_idx]->set_thread_ptr(t);
358       reactor_[vector_idx]->ScheduleRpc();
359     }
360     return true;
361   }
362 
InitThreadFuncImpl(size_t)363   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
364 
365  private:
366   std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
367 };
368 
369 // TODO(mhaidry) : Implement Streaming from client, server and both ways
370 
CreateCallbackClient(const ClientConfig & config)371 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
372   switch (config.rpc_type()) {
373     case UNARY:
374       return std::unique_ptr<Client>(new CallbackUnaryClient(config));
375     case STREAMING:
376       return std::unique_ptr<Client>(
377           new CallbackStreamingPingPongClientImpl(config));
378     case STREAMING_FROM_CLIENT:
379     case STREAMING_FROM_SERVER:
380     case STREAMING_BOTH_WAYS:
381       grpc_core::Crash(
382           "STREAMING_FROM_* scenarios are not supported by the callback "
383           "API");
384     default:
385       grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
386   }
387 }
388 
389 }  // namespace testing
390 }  // namespace grpc
391