xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/client_sync.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/interarrival.h"
40 #include "test/cpp/qps/usage_timer.h"
41 
42 namespace grpc {
43 namespace testing {
44 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)45 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46     const std::shared_ptr<Channel>& ch) {
47   return BenchmarkService::NewStub(ch);
48 }
49 
50 class SynchronousClient
51     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
52  public:
SynchronousClient(const ClientConfig & config)53   explicit SynchronousClient(const ClientConfig& config)
54       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55             config, BenchmarkStubCreator) {
56     num_threads_ =
57         config.outstanding_rpcs_per_channel() * config.client_channels();
58     responses_.resize(num_threads_);
59     SetupLoadTest(config, num_threads_);
60   }
61 
~SynchronousClient()62   ~SynchronousClient() override {}
63 
64   virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
66 
ThreadFunc(size_t thread_idx,Thread * t)67   void ThreadFunc(size_t thread_idx, Thread* t) override {
68     if (!InitThreadFuncImpl(thread_idx)) {
69       return;
70     }
71     for (;;) {
72       // run the loop body
73       HistogramEntry entry;
74       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75       t->UpdateHistogram(&entry);
76       if (!thread_still_ok || ThreadCompleted()) {
77         return;
78       }
79     }
80   }
81 
82  protected:
83   // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)84   bool WaitToIssue(int thread_idx) {
85     if (!closed_loop_) {
86       const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87       // Avoid sleeping for too long continuously because we might
88       // need to terminate before then. This is an issue since
89       // exponential distribution can occasionally produce bad outliers
90       while (true) {
91         const gpr_timespec one_sec_delay =
92             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93                          gpr_time_from_seconds(1, GPR_TIMESPAN));
94         if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95           gpr_sleep_until(next_issue_time);
96           return true;
97         } else {
98           gpr_sleep_until(one_sec_delay);
99           if (gpr_atm_acq_load(&thread_pool_done_) != gpr_atm{0}) {
100             return false;
101           }
102         }
103       }
104     }
105     return true;
106   }
107 
108   size_t num_threads_;
109   std::vector<SimpleResponse> responses_;
110 };
111 
112 class SynchronousUnaryClient final : public SynchronousClient {
113  public:
SynchronousUnaryClient(const ClientConfig & config)114   explicit SynchronousUnaryClient(const ClientConfig& config)
115       : SynchronousClient(config) {
116     StartThreads(num_threads_);
117   }
~SynchronousUnaryClient()118   ~SynchronousUnaryClient() override {}
119 
InitThreadFuncImpl(size_t)120   bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
121 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)122   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123     if (!WaitToIssue(thread_idx)) {
124       return true;
125     }
126     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127     double start = UsageTimer::Now();
128     grpc::ClientContext context;
129     grpc::Status s =
130         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
131     if (s.ok()) {
132       entry->set_value((UsageTimer::Now() - start) * 1e9);
133     }
134     entry->set_status(s.error_code());
135     return true;
136   }
137 
138  private:
DestroyMultithreading()139   void DestroyMultithreading() final { EndThreads(); }
140 };
141 
142 template <class StreamType>
143 class SynchronousStreamingClient : public SynchronousClient {
144  public:
SynchronousStreamingClient(const ClientConfig & config)145   explicit SynchronousStreamingClient(const ClientConfig& config)
146       : SynchronousClient(config),
147         context_(num_threads_),
148         stream_(num_threads_),
149         stream_mu_(num_threads_),
150         shutdown_(num_threads_),
151         messages_per_stream_(config.messages_per_stream()),
152         messages_issued_(num_threads_) {
153     StartThreads(num_threads_);
154   }
~SynchronousStreamingClient()155   ~SynchronousStreamingClient() override {
156     CleanupAllStreams([this](size_t thread_idx) {
157       // Don't log any kind of error since we may have canceled this
158       stream_[thread_idx]->Finish().IgnoreError();
159     });
160   }
161 
162  protected:
163   std::vector<grpc::ClientContext> context_;
164   std::vector<std::unique_ptr<StreamType>> stream_;
165   // stream_mu_ is only needed when changing an element of stream_ or context_
166   std::vector<std::mutex> stream_mu_;
167   // use struct Bool rather than bool because vector<bool> is not concurrent
168   struct Bool {
169     bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool170     Bool() : val(false) {}
171   };
172   std::vector<Bool> shutdown_;
173   const int messages_per_stream_;
174   std::vector<int> messages_issued_;
175 
FinishStream(HistogramEntry * entry,size_t thread_idx)176   void FinishStream(HistogramEntry* entry, size_t thread_idx) {
177     Status s = stream_[thread_idx]->Finish();
178     // don't set the value since the stream is failed and shouldn't be timed
179     entry->set_status(s.error_code());
180     if (!s.ok()) {
181       std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
182       if (!shutdown_[thread_idx].val) {
183         gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
184                 thread_idx, s.error_message().c_str());
185       }
186     }
187     // Lock the stream_mu_ now because the client context could change
188     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
189     context_[thread_idx].~ClientContext();
190     new (&context_[thread_idx]) ClientContext();
191   }
192 
CleanupAllStreams(const std::function<void (size_t)> & cleaner)193   void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
194     std::vector<std::thread> cleanup_threads;
195     for (size_t i = 0; i < num_threads_; i++) {
196       cleanup_threads.emplace_back([this, i, cleaner] {
197         std::lock_guard<std::mutex> l(stream_mu_[i]);
198         shutdown_[i].val = true;
199         if (stream_[i]) {
200           cleaner(i);
201         }
202       });
203     }
204     for (auto& th : cleanup_threads) {
205       th.join();
206     }
207   }
208 
209  private:
DestroyMultithreading()210   void DestroyMultithreading() final {
211     CleanupAllStreams(
212         [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
213     EndThreads();
214   }
215 };
216 
217 class SynchronousStreamingPingPongClient final
218     : public SynchronousStreamingClient<
219           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
220  public:
SynchronousStreamingPingPongClient(const ClientConfig & config)221   explicit SynchronousStreamingPingPongClient(const ClientConfig& config)
222       : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()223   ~SynchronousStreamingPingPongClient() override {
224     CleanupAllStreams(
225         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
226   }
227 
228  private:
InitThreadFuncImpl(size_t thread_idx)229   bool InitThreadFuncImpl(size_t thread_idx) override {
230     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
231     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
232     if (!shutdown_[thread_idx].val) {
233       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
234     } else {
235       return false;
236     }
237     messages_issued_[thread_idx] = 0;
238     return true;
239   }
240 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)241   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
242     if (!WaitToIssue(thread_idx)) {
243       return true;
244     }
245     double start = UsageTimer::Now();
246     if (stream_[thread_idx]->Write(request_) &&
247         stream_[thread_idx]->Read(&responses_[thread_idx])) {
248       entry->set_value((UsageTimer::Now() - start) * 1e9);
249       // don't set the status since there isn't one yet
250       if ((messages_per_stream_ != 0) &&
251           (++messages_issued_[thread_idx] < messages_per_stream_)) {
252         return true;
253       } else if (messages_per_stream_ == 0) {
254         return true;
255       } else {
256         // Fall through to the below resetting code after finish
257       }
258     }
259     stream_[thread_idx]->WritesDone();
260     FinishStream(entry, thread_idx);
261     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
262     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
263     if (!shutdown_[thread_idx].val) {
264       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
265     } else {
266       stream_[thread_idx].reset();
267       return false;
268     }
269     messages_issued_[thread_idx] = 0;
270     return true;
271   }
272 };
273 
274 class SynchronousStreamingFromClientClient final
275     : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
276  public:
SynchronousStreamingFromClientClient(const ClientConfig & config)277   explicit SynchronousStreamingFromClientClient(const ClientConfig& config)
278       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()279   ~SynchronousStreamingFromClientClient() override {
280     CleanupAllStreams(
281         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
282   }
283 
284  private:
285   std::vector<double> last_issue_;
286 
InitThreadFuncImpl(size_t thread_idx)287   bool InitThreadFuncImpl(size_t thread_idx) override {
288     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
289     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
290     if (!shutdown_[thread_idx].val) {
291       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
292                                                       &responses_[thread_idx]);
293     } else {
294       return false;
295     }
296     last_issue_[thread_idx] = UsageTimer::Now();
297     return true;
298   }
299 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)300   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
301     // Figure out how to make histogram sensible if this is rate-paced
302     if (!WaitToIssue(thread_idx)) {
303       return true;
304     }
305     if (stream_[thread_idx]->Write(request_)) {
306       double now = UsageTimer::Now();
307       entry->set_value((now - last_issue_[thread_idx]) * 1e9);
308       last_issue_[thread_idx] = now;
309       return true;
310     }
311     stream_[thread_idx]->WritesDone();
312     FinishStream(entry, thread_idx);
313     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
314     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
315     if (!shutdown_[thread_idx].val) {
316       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
317                                                       &responses_[thread_idx]);
318     } else {
319       stream_[thread_idx].reset();
320       return false;
321     }
322     return true;
323   }
324 };
325 
326 class SynchronousStreamingFromServerClient final
327     : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
328  public:
SynchronousStreamingFromServerClient(const ClientConfig & config)329   explicit SynchronousStreamingFromServerClient(const ClientConfig& config)
330       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()331   ~SynchronousStreamingFromServerClient() override {}
332 
333  private:
334   std::vector<double> last_recv_;
335 
InitThreadFuncImpl(size_t thread_idx)336   bool InitThreadFuncImpl(size_t thread_idx) override {
337     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
338     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
339     if (!shutdown_[thread_idx].val) {
340       stream_[thread_idx] =
341           stub->StreamingFromServer(&context_[thread_idx], request_);
342     } else {
343       return false;
344     }
345     last_recv_[thread_idx] = UsageTimer::Now();
346     return true;
347   }
348 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)349   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
350     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
351       double now = UsageTimer::Now();
352       entry->set_value((now - last_recv_[thread_idx]) * 1e9);
353       last_recv_[thread_idx] = now;
354       return true;
355     }
356     FinishStream(entry, thread_idx);
357     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
358     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
359     if (!shutdown_[thread_idx].val) {
360       stream_[thread_idx] =
361           stub->StreamingFromServer(&context_[thread_idx], request_);
362     } else {
363       stream_[thread_idx].reset();
364       return false;
365     }
366     return true;
367   }
368 };
369 
370 class SynchronousStreamingBothWaysClient final
371     : public SynchronousStreamingClient<
372           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
373  public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)374   explicit SynchronousStreamingBothWaysClient(const ClientConfig& config)
375       : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()376   ~SynchronousStreamingBothWaysClient() override {
377     CleanupAllStreams(
378         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
379   }
380 
381  private:
InitThreadFuncImpl(size_t thread_idx)382   bool InitThreadFuncImpl(size_t thread_idx) override {
383     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
384     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
385     if (!shutdown_[thread_idx].val) {
386       stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
387     } else {
388       return false;
389     }
390     return true;
391   }
392 
ThreadFuncImpl(HistogramEntry *,size_t)393   bool ThreadFuncImpl(HistogramEntry* /*entry*/,
394                       size_t /*thread_idx*/) override {
395     // TODO (vjpai): Do this
396     return true;
397   }
398 };
399 
CreateSynchronousClient(const ClientConfig & config)400 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
401   GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
402   switch (config.rpc_type()) {
403     case UNARY:
404       return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
405     case STREAMING:
406       return std::unique_ptr<Client>(
407           new SynchronousStreamingPingPongClient(config));
408     case STREAMING_FROM_CLIENT:
409       return std::unique_ptr<Client>(
410           new SynchronousStreamingFromClientClient(config));
411     case STREAMING_FROM_SERVER:
412       return std::unique_ptr<Client>(
413           new SynchronousStreamingFromServerClient(config));
414     case STREAMING_BOTH_WAYS:
415       return std::unique_ptr<Client>(
416           new SynchronousStreamingBothWaysClient(config));
417     default:
418       assert(false);
419       return nullptr;
420   }
421 }
422 
423 }  // namespace testing
424 }  // namespace grpc
425