1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/interarrival.h"
40 #include "test/cpp/qps/usage_timer.h"
41
42 namespace grpc {
43 namespace testing {
44
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)45 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46 const std::shared_ptr<Channel>& ch) {
47 return BenchmarkService::NewStub(ch);
48 }
49
50 class SynchronousClient
51 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
52 public:
SynchronousClient(const ClientConfig & config)53 explicit SynchronousClient(const ClientConfig& config)
54 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55 config, BenchmarkStubCreator) {
56 num_threads_ =
57 config.outstanding_rpcs_per_channel() * config.client_channels();
58 responses_.resize(num_threads_);
59 SetupLoadTest(config, num_threads_);
60 }
61
~SynchronousClient()62 ~SynchronousClient() override {}
63
64 virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65 virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
66
ThreadFunc(size_t thread_idx,Thread * t)67 void ThreadFunc(size_t thread_idx, Thread* t) override {
68 if (!InitThreadFuncImpl(thread_idx)) {
69 return;
70 }
71 for (;;) {
72 // run the loop body
73 HistogramEntry entry;
74 const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75 t->UpdateHistogram(&entry);
76 if (!thread_still_ok || ThreadCompleted()) {
77 return;
78 }
79 }
80 }
81
82 protected:
83 // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)84 bool WaitToIssue(int thread_idx) {
85 if (!closed_loop_) {
86 const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87 // Avoid sleeping for too long continuously because we might
88 // need to terminate before then. This is an issue since
89 // exponential distribution can occasionally produce bad outliers
90 while (true) {
91 const gpr_timespec one_sec_delay =
92 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93 gpr_time_from_seconds(1, GPR_TIMESPAN));
94 if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95 gpr_sleep_until(next_issue_time);
96 return true;
97 } else {
98 gpr_sleep_until(one_sec_delay);
99 if (gpr_atm_acq_load(&thread_pool_done_) != gpr_atm{0}) {
100 return false;
101 }
102 }
103 }
104 }
105 return true;
106 }
107
108 size_t num_threads_;
109 std::vector<SimpleResponse> responses_;
110 };
111
112 class SynchronousUnaryClient final : public SynchronousClient {
113 public:
SynchronousUnaryClient(const ClientConfig & config)114 explicit SynchronousUnaryClient(const ClientConfig& config)
115 : SynchronousClient(config) {
116 StartThreads(num_threads_);
117 }
~SynchronousUnaryClient()118 ~SynchronousUnaryClient() override {}
119
InitThreadFuncImpl(size_t)120 bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
121
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)122 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123 if (!WaitToIssue(thread_idx)) {
124 return true;
125 }
126 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127 double start = UsageTimer::Now();
128 grpc::ClientContext context;
129 grpc::Status s =
130 stub->UnaryCall(&context, request_, &responses_[thread_idx]);
131 if (s.ok()) {
132 entry->set_value((UsageTimer::Now() - start) * 1e9);
133 }
134 entry->set_status(s.error_code());
135 return true;
136 }
137
138 private:
DestroyMultithreading()139 void DestroyMultithreading() final { EndThreads(); }
140 };
141
142 template <class StreamType>
143 class SynchronousStreamingClient : public SynchronousClient {
144 public:
SynchronousStreamingClient(const ClientConfig & config)145 explicit SynchronousStreamingClient(const ClientConfig& config)
146 : SynchronousClient(config),
147 context_(num_threads_),
148 stream_(num_threads_),
149 stream_mu_(num_threads_),
150 shutdown_(num_threads_),
151 messages_per_stream_(config.messages_per_stream()),
152 messages_issued_(num_threads_) {
153 StartThreads(num_threads_);
154 }
~SynchronousStreamingClient()155 ~SynchronousStreamingClient() override {
156 CleanupAllStreams([this](size_t thread_idx) {
157 // Don't log any kind of error since we may have canceled this
158 stream_[thread_idx]->Finish().IgnoreError();
159 });
160 }
161
162 protected:
163 std::vector<grpc::ClientContext> context_;
164 std::vector<std::unique_ptr<StreamType>> stream_;
165 // stream_mu_ is only needed when changing an element of stream_ or context_
166 std::vector<std::mutex> stream_mu_;
167 // use struct Bool rather than bool because vector<bool> is not concurrent
168 struct Bool {
169 bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool170 Bool() : val(false) {}
171 };
172 std::vector<Bool> shutdown_;
173 const int messages_per_stream_;
174 std::vector<int> messages_issued_;
175
FinishStream(HistogramEntry * entry,size_t thread_idx)176 void FinishStream(HistogramEntry* entry, size_t thread_idx) {
177 Status s = stream_[thread_idx]->Finish();
178 // don't set the value since the stream is failed and shouldn't be timed
179 entry->set_status(s.error_code());
180 if (!s.ok()) {
181 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
182 if (!shutdown_[thread_idx].val) {
183 gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
184 thread_idx, s.error_message().c_str());
185 }
186 }
187 // Lock the stream_mu_ now because the client context could change
188 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
189 context_[thread_idx].~ClientContext();
190 new (&context_[thread_idx]) ClientContext();
191 }
192
CleanupAllStreams(const std::function<void (size_t)> & cleaner)193 void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
194 std::vector<std::thread> cleanup_threads;
195 for (size_t i = 0; i < num_threads_; i++) {
196 cleanup_threads.emplace_back([this, i, cleaner] {
197 std::lock_guard<std::mutex> l(stream_mu_[i]);
198 shutdown_[i].val = true;
199 if (stream_[i]) {
200 cleaner(i);
201 }
202 });
203 }
204 for (auto& th : cleanup_threads) {
205 th.join();
206 }
207 }
208
209 private:
DestroyMultithreading()210 void DestroyMultithreading() final {
211 CleanupAllStreams(
212 [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
213 EndThreads();
214 }
215 };
216
217 class SynchronousStreamingPingPongClient final
218 : public SynchronousStreamingClient<
219 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
220 public:
SynchronousStreamingPingPongClient(const ClientConfig & config)221 explicit SynchronousStreamingPingPongClient(const ClientConfig& config)
222 : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()223 ~SynchronousStreamingPingPongClient() override {
224 CleanupAllStreams(
225 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
226 }
227
228 private:
InitThreadFuncImpl(size_t thread_idx)229 bool InitThreadFuncImpl(size_t thread_idx) override {
230 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
231 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
232 if (!shutdown_[thread_idx].val) {
233 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
234 } else {
235 return false;
236 }
237 messages_issued_[thread_idx] = 0;
238 return true;
239 }
240
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)241 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
242 if (!WaitToIssue(thread_idx)) {
243 return true;
244 }
245 double start = UsageTimer::Now();
246 if (stream_[thread_idx]->Write(request_) &&
247 stream_[thread_idx]->Read(&responses_[thread_idx])) {
248 entry->set_value((UsageTimer::Now() - start) * 1e9);
249 // don't set the status since there isn't one yet
250 if ((messages_per_stream_ != 0) &&
251 (++messages_issued_[thread_idx] < messages_per_stream_)) {
252 return true;
253 } else if (messages_per_stream_ == 0) {
254 return true;
255 } else {
256 // Fall through to the below resetting code after finish
257 }
258 }
259 stream_[thread_idx]->WritesDone();
260 FinishStream(entry, thread_idx);
261 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
262 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
263 if (!shutdown_[thread_idx].val) {
264 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
265 } else {
266 stream_[thread_idx].reset();
267 return false;
268 }
269 messages_issued_[thread_idx] = 0;
270 return true;
271 }
272 };
273
274 class SynchronousStreamingFromClientClient final
275 : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
276 public:
SynchronousStreamingFromClientClient(const ClientConfig & config)277 explicit SynchronousStreamingFromClientClient(const ClientConfig& config)
278 : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()279 ~SynchronousStreamingFromClientClient() override {
280 CleanupAllStreams(
281 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
282 }
283
284 private:
285 std::vector<double> last_issue_;
286
InitThreadFuncImpl(size_t thread_idx)287 bool InitThreadFuncImpl(size_t thread_idx) override {
288 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
289 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
290 if (!shutdown_[thread_idx].val) {
291 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
292 &responses_[thread_idx]);
293 } else {
294 return false;
295 }
296 last_issue_[thread_idx] = UsageTimer::Now();
297 return true;
298 }
299
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)300 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
301 // Figure out how to make histogram sensible if this is rate-paced
302 if (!WaitToIssue(thread_idx)) {
303 return true;
304 }
305 if (stream_[thread_idx]->Write(request_)) {
306 double now = UsageTimer::Now();
307 entry->set_value((now - last_issue_[thread_idx]) * 1e9);
308 last_issue_[thread_idx] = now;
309 return true;
310 }
311 stream_[thread_idx]->WritesDone();
312 FinishStream(entry, thread_idx);
313 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
314 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
315 if (!shutdown_[thread_idx].val) {
316 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
317 &responses_[thread_idx]);
318 } else {
319 stream_[thread_idx].reset();
320 return false;
321 }
322 return true;
323 }
324 };
325
326 class SynchronousStreamingFromServerClient final
327 : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
328 public:
SynchronousStreamingFromServerClient(const ClientConfig & config)329 explicit SynchronousStreamingFromServerClient(const ClientConfig& config)
330 : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()331 ~SynchronousStreamingFromServerClient() override {}
332
333 private:
334 std::vector<double> last_recv_;
335
InitThreadFuncImpl(size_t thread_idx)336 bool InitThreadFuncImpl(size_t thread_idx) override {
337 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
338 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
339 if (!shutdown_[thread_idx].val) {
340 stream_[thread_idx] =
341 stub->StreamingFromServer(&context_[thread_idx], request_);
342 } else {
343 return false;
344 }
345 last_recv_[thread_idx] = UsageTimer::Now();
346 return true;
347 }
348
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)349 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
350 if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
351 double now = UsageTimer::Now();
352 entry->set_value((now - last_recv_[thread_idx]) * 1e9);
353 last_recv_[thread_idx] = now;
354 return true;
355 }
356 FinishStream(entry, thread_idx);
357 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
358 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
359 if (!shutdown_[thread_idx].val) {
360 stream_[thread_idx] =
361 stub->StreamingFromServer(&context_[thread_idx], request_);
362 } else {
363 stream_[thread_idx].reset();
364 return false;
365 }
366 return true;
367 }
368 };
369
370 class SynchronousStreamingBothWaysClient final
371 : public SynchronousStreamingClient<
372 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
373 public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)374 explicit SynchronousStreamingBothWaysClient(const ClientConfig& config)
375 : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()376 ~SynchronousStreamingBothWaysClient() override {
377 CleanupAllStreams(
378 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
379 }
380
381 private:
InitThreadFuncImpl(size_t thread_idx)382 bool InitThreadFuncImpl(size_t thread_idx) override {
383 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
384 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
385 if (!shutdown_[thread_idx].val) {
386 stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
387 } else {
388 return false;
389 }
390 return true;
391 }
392
ThreadFuncImpl(HistogramEntry *,size_t)393 bool ThreadFuncImpl(HistogramEntry* /*entry*/,
394 size_t /*thread_idx*/) override {
395 // TODO (vjpai): Do this
396 return true;
397 }
398 };
399
CreateSynchronousClient(const ClientConfig & config)400 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
401 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
402 switch (config.rpc_type()) {
403 case UNARY:
404 return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
405 case STREAMING:
406 return std::unique_ptr<Client>(
407 new SynchronousStreamingPingPongClient(config));
408 case STREAMING_FROM_CLIENT:
409 return std::unique_ptr<Client>(
410 new SynchronousStreamingFromClientClient(config));
411 case STREAMING_FROM_SERVER:
412 return std::unique_ptr<Client>(
413 new SynchronousStreamingFromServerClient(config));
414 case STREAMING_BOTH_WAYS:
415 return std::unique_ptr<Client>(
416 new SynchronousStreamingBothWaysClient(config));
417 default:
418 assert(false);
419 return nullptr;
420 }
421 }
422
423 } // namespace testing
424 } // namespace grpc
425