1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/interop_client.h"
20
21 #include <cinttypes>
22 #include <fstream>
23 #include <memory>
24 #include <string>
25 #include <type_traits>
26 #include <utility>
27
28 #include "absl/cleanup/cleanup.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/types/optional.h"
33
34 #include <grpc/grpc.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/time.h>
39 #include <grpcpp/channel.h>
40 #include <grpcpp/client_context.h>
41 #include <grpcpp/security/credentials.h>
42
43 #include "src/core/lib/config/config_vars.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/proto/grpc/testing/empty.pb.h"
47 #include "src/proto/grpc/testing/messages.pb.h"
48 #include "src/proto/grpc/testing/test.grpc.pb.h"
49 #include "test/core/util/histogram.h"
50 #include "test/cpp/interop/backend_metrics_lb_policy.h"
51 #include "test/cpp/interop/client_helper.h"
52
53 namespace grpc {
54 namespace testing {
55
56 namespace {
57 // The same value is defined by the Java client.
58 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
59 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
60 const int kNumResponseMessages = 2000;
61 const int kResponseMessageSize = 1030;
62 const int kReceiveDelayMilliSeconds = 20;
63 const int kLargeRequestSize = 271828;
64 const int kLargeResponseSize = 314159;
65
NoopChecks(const InteropClientContextInspector &,const SimpleRequest *,const SimpleResponse *)66 void NoopChecks(const InteropClientContextInspector& /*inspector*/,
67 const SimpleRequest* /*request*/,
68 const SimpleResponse* /*response*/) {}
69
UnaryCompressionChecks(const InteropClientContextInspector & inspector,const SimpleRequest * request,const SimpleResponse *)70 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
71 const SimpleRequest* request,
72 const SimpleResponse* /*response*/) {
73 const grpc_compression_algorithm received_compression =
74 inspector.GetCallCompressionAlgorithm();
75 if (request->response_compressed().value()) {
76 if (received_compression == GRPC_COMPRESS_NONE) {
77 // Requested some compression, got NONE. This is an error.
78 grpc_core::Crash(
79 "Failure: Requested compression but got uncompressed response "
80 "from server.");
81 }
82 GPR_ASSERT(inspector.WasCompressed());
83 } else {
84 // Didn't request compression -> make sure the response is uncompressed
85 GPR_ASSERT(!(inspector.WasCompressed()));
86 }
87 }
88
ValuesDiff(absl::string_view field,double expected,double actual)89 absl::optional<std::string> ValuesDiff(absl::string_view field, double expected,
90 double actual) {
91 if (expected != actual) {
92 return absl::StrFormat("%s: expected: %f, actual: %f", field, expected,
93 actual);
94 }
95 return absl::nullopt;
96 }
97
98 template <typename Map>
MapsDiff(absl::string_view path,const Map & expected,const Map & actual)99 absl::optional<std::string> MapsDiff(absl::string_view path,
100 const Map& expected, const Map& actual) {
101 auto result = ValuesDiff(absl::StrFormat("%s size", path), expected.size(),
102 actual.size());
103 if (result.has_value()) {
104 return result;
105 }
106 for (const auto& key_value : expected) {
107 auto it = actual.find(key_value.first);
108 if (it == actual.end()) {
109 return absl::StrFormat("In field %s, key %s was not found", path,
110 key_value.first);
111 }
112 result = ValuesDiff(absl::StrFormat("%s/%s", path, key_value.first),
113 key_value.second, it->second);
114 if (result.has_value()) {
115 return result;
116 }
117 }
118 return absl::nullopt;
119 }
120
OrcaLoadReportsDiff(const TestOrcaReport & expected,const TestOrcaReport & actual)121 absl::optional<std::string> OrcaLoadReportsDiff(const TestOrcaReport& expected,
122 const TestOrcaReport& actual) {
123 auto error = ValuesDiff("cpu_utilization", expected.cpu_utilization(),
124 actual.cpu_utilization());
125 if (error.has_value()) {
126 return error;
127 }
128 error = ValuesDiff("mem_utilization", expected.memory_utilization(),
129 actual.memory_utilization());
130 if (error.has_value()) {
131 return error;
132 }
133 error =
134 MapsDiff("request_cost", expected.request_cost(), actual.request_cost());
135 if (error.has_value()) {
136 return error;
137 }
138 error = MapsDiff("utilization", expected.utilization(), actual.utilization());
139 if (error.has_value()) {
140 return error;
141 }
142 return absl::nullopt;
143 }
144 } // namespace
145
ServiceStub(ChannelCreationFunc channel_creation_func,bool new_stub_every_call)146 InteropClient::ServiceStub::ServiceStub(
147 ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
148 : channel_creation_func_(std::move(channel_creation_func)),
149 new_stub_every_call_(new_stub_every_call) {}
150
Get()151 TestService::Stub* InteropClient::ServiceStub::Get() {
152 if (new_stub_every_call_ || stub_ == nullptr) {
153 if (channel_ == nullptr) {
154 channel_ = channel_creation_func_();
155 }
156 stub_ = TestService::NewStub(channel_);
157 }
158 return stub_.get();
159 }
160
161 UnimplementedService::Stub*
GetUnimplementedServiceStub()162 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
163 if (unimplemented_service_stub_ == nullptr) {
164 if (channel_ == nullptr) {
165 channel_ = channel_creation_func_();
166 }
167 unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
168 }
169 return unimplemented_service_stub_.get();
170 }
171
ResetChannel()172 void InteropClient::ServiceStub::ResetChannel() {
173 channel_.reset();
174 stub_.reset();
175 }
176
InteropClient(ChannelCreationFunc channel_creation_func,bool new_stub_every_test_case,bool do_not_abort_on_transient_failures)177 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
178 bool new_stub_every_test_case,
179 bool do_not_abort_on_transient_failures)
180 : serviceStub_(
181 [channel_creation_func = std::move(channel_creation_func), this]() {
182 return channel_creation_func(
183 load_report_tracker_.GetChannelArguments());
184 },
185 new_stub_every_test_case),
186 do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
187
AssertStatusOk(const Status & s,const std::string & optional_debug_string)188 bool InteropClient::AssertStatusOk(const Status& s,
189 const std::string& optional_debug_string) {
190 if (s.ok()) {
191 return true;
192 }
193
194 // Note: At this point, s.error_code is definitely not StatusCode::OK (we
195 // already checked for s.ok() above). So, the following will call abort()
196 // (unless s.error_code() corresponds to a transient failure and
197 // 'do_not_abort_on_transient_failures' is true)
198 return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
199 }
200
AssertStatusCode(const Status & s,StatusCode expected_code,const std::string & optional_debug_string)201 bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
202 const std::string& optional_debug_string) {
203 if (s.error_code() == expected_code) {
204 return true;
205 }
206
207 gpr_log(GPR_ERROR,
208 "Error status code: %d (expected: %d), message: %s,"
209 " debug string: %s",
210 s.error_code(), expected_code, s.error_message().c_str(),
211 optional_debug_string.c_str());
212
213 // In case of transient transient/retryable failures (like a broken
214 // connection) we may or may not abort (see TransientFailureOrAbort())
215 if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
216 return TransientFailureOrAbort();
217 }
218
219 abort();
220 }
221
DoEmpty()222 bool InteropClient::DoEmpty() {
223 gpr_log(GPR_DEBUG, "Sending an empty rpc...");
224
225 Empty request;
226 Empty response;
227 ClientContext context;
228
229 Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
230
231 if (!AssertStatusOk(s, context.debug_error_string())) {
232 return false;
233 }
234
235 gpr_log(GPR_DEBUG, "Empty rpc done.");
236 return true;
237 }
238
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response)239 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
240 SimpleResponse* response) {
241 return PerformLargeUnary(request, response, NoopChecks);
242 }
243
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response,const CheckerFn & custom_checks_fn)244 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
245 SimpleResponse* response,
246 const CheckerFn& custom_checks_fn) {
247 ClientContext context;
248 InteropClientContextInspector inspector(context);
249 request->set_response_size(kLargeResponseSize);
250 std::string payload(kLargeRequestSize, '\0');
251 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
252 if (request->has_expect_compressed()) {
253 if (request->expect_compressed().value()) {
254 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
255 } else {
256 context.set_compression_algorithm(GRPC_COMPRESS_NONE);
257 }
258 }
259
260 Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
261 if (!AssertStatusOk(s, context.debug_error_string())) {
262 return false;
263 }
264
265 custom_checks_fn(inspector, request, response);
266
267 // Payload related checks.
268 GPR_ASSERT(response->payload().body() ==
269 std::string(kLargeResponseSize, '\0'));
270 return true;
271 }
272
DoComputeEngineCreds(const std::string & default_service_account,const std::string & oauth_scope)273 bool InteropClient::DoComputeEngineCreds(
274 const std::string& default_service_account,
275 const std::string& oauth_scope) {
276 gpr_log(GPR_DEBUG,
277 "Sending a large unary rpc with compute engine credentials ...");
278 SimpleRequest request;
279 SimpleResponse response;
280 request.set_fill_username(true);
281 request.set_fill_oauth_scope(true);
282
283 if (!PerformLargeUnary(&request, &response)) {
284 return false;
285 }
286
287 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
288 gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
289 GPR_ASSERT(!response.username().empty());
290 GPR_ASSERT(response.username() == default_service_account);
291 GPR_ASSERT(!response.oauth_scope().empty());
292 const char* oauth_scope_str = response.oauth_scope().c_str();
293 GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
294 gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
295 return true;
296 }
297
DoOauth2AuthToken(const std::string & username,const std::string & oauth_scope)298 bool InteropClient::DoOauth2AuthToken(const std::string& username,
299 const std::string& oauth_scope) {
300 gpr_log(GPR_DEBUG,
301 "Sending a unary rpc with raw oauth2 access token credentials ...");
302 SimpleRequest request;
303 SimpleResponse response;
304 request.set_fill_username(true);
305 request.set_fill_oauth_scope(true);
306
307 ClientContext context;
308
309 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
310
311 if (!AssertStatusOk(s, context.debug_error_string())) {
312 return false;
313 }
314
315 GPR_ASSERT(!response.username().empty());
316 GPR_ASSERT(!response.oauth_scope().empty());
317 GPR_ASSERT(username == response.username());
318 const char* oauth_scope_str = response.oauth_scope().c_str();
319 GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
320 gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
321 return true;
322 }
323
DoPerRpcCreds(const std::string & json_key)324 bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
325 gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
326 SimpleRequest request;
327 SimpleResponse response;
328 request.set_fill_username(true);
329
330 ClientContext context;
331 std::chrono::seconds token_lifetime = std::chrono::hours(1);
332 std::shared_ptr<CallCredentials> creds =
333 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
334
335 context.set_credentials(creds);
336
337 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
338
339 if (!AssertStatusOk(s, context.debug_error_string())) {
340 return false;
341 }
342
343 GPR_ASSERT(!response.username().empty());
344 GPR_ASSERT(json_key.find(response.username()) != std::string::npos);
345 gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
346 return true;
347 }
348
DoJwtTokenCreds(const std::string & username)349 bool InteropClient::DoJwtTokenCreds(const std::string& username) {
350 gpr_log(GPR_DEBUG,
351 "Sending a large unary rpc with JWT token credentials ...");
352 SimpleRequest request;
353 SimpleResponse response;
354 request.set_fill_username(true);
355
356 if (!PerformLargeUnary(&request, &response)) {
357 return false;
358 }
359
360 GPR_ASSERT(!response.username().empty());
361 GPR_ASSERT(username.find(response.username()) != std::string::npos);
362 gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
363 return true;
364 }
365
DoGoogleDefaultCredentials(const std::string & default_service_account)366 bool InteropClient::DoGoogleDefaultCredentials(
367 const std::string& default_service_account) {
368 gpr_log(GPR_DEBUG,
369 "Sending a large unary rpc with GoogleDefaultCredentials...");
370 SimpleRequest request;
371 SimpleResponse response;
372 request.set_fill_username(true);
373
374 if (!PerformLargeUnary(&request, &response)) {
375 return false;
376 }
377
378 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
379 GPR_ASSERT(!response.username().empty());
380 GPR_ASSERT(response.username() == default_service_account);
381 gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
382 return true;
383 }
384
DoLargeUnary()385 bool InteropClient::DoLargeUnary() {
386 gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
387 SimpleRequest request;
388 SimpleResponse response;
389 if (!PerformLargeUnary(&request, &response)) {
390 return false;
391 }
392 gpr_log(GPR_DEBUG, "Large unary done.");
393 return true;
394 }
395
DoClientCompressedUnary()396 bool InteropClient::DoClientCompressedUnary() {
397 // Probing for compression-checks support.
398 ClientContext probe_context;
399 SimpleRequest probe_req;
400 SimpleResponse probe_res;
401
402 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
403 probe_req.mutable_expect_compressed()->set_value(true); // lies!
404
405 probe_req.set_response_size(kLargeResponseSize);
406 probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
407
408 gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
409 const Status s =
410 serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
411 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
412 // The server isn't able to evaluate incoming compression, making the rest
413 // of this test moot.
414 gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
415 return false;
416 }
417 gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
418
419 const std::vector<bool> compressions = {true, false};
420 for (size_t i = 0; i < compressions.size(); i++) {
421 std::string log_suffix =
422 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
423
424 gpr_log(GPR_DEBUG, "Sending compressed unary request %s.",
425 log_suffix.c_str());
426 SimpleRequest request;
427 SimpleResponse response;
428 request.mutable_expect_compressed()->set_value(compressions[i]);
429 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
430 gpr_log(GPR_ERROR, "Compressed unary request failed %s",
431 log_suffix.c_str());
432 return false;
433 }
434
435 gpr_log(GPR_DEBUG, "Compressed unary request failed %s",
436 log_suffix.c_str());
437 }
438
439 return true;
440 }
441
DoServerCompressedUnary()442 bool InteropClient::DoServerCompressedUnary() {
443 const std::vector<bool> compressions = {true, false};
444 for (size_t i = 0; i < compressions.size(); i++) {
445 std::string log_suffix =
446 absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
447
448 gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
449 log_suffix.c_str());
450 SimpleRequest request;
451 SimpleResponse response;
452 request.mutable_response_compressed()->set_value(compressions[i]);
453
454 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
455 gpr_log(GPR_ERROR, "Request for compressed unary failed %s",
456 log_suffix.c_str());
457 return false;
458 }
459
460 gpr_log(GPR_DEBUG, "Request for compressed unary failed %s",
461 log_suffix.c_str());
462 }
463
464 return true;
465 }
466
467 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
468 // false
TransientFailureOrAbort()469 bool InteropClient::TransientFailureOrAbort() {
470 if (do_not_abort_on_transient_failures_) {
471 return false;
472 }
473
474 abort();
475 }
476
DoRequestStreaming()477 bool InteropClient::DoRequestStreaming() {
478 gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
479
480 ClientContext context;
481 StreamingInputCallRequest request;
482 StreamingInputCallResponse response;
483
484 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
485 serviceStub_.Get()->StreamingInputCall(&context, &response));
486
487 int aggregated_payload_size = 0;
488 for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
489 Payload* payload = request.mutable_payload();
490 payload->set_body(std::string(request_stream_sizes[i], '\0'));
491 if (!stream->Write(request)) {
492 gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
493 return TransientFailureOrAbort();
494 }
495 aggregated_payload_size += request_stream_sizes[i];
496 }
497 GPR_ASSERT(stream->WritesDone());
498
499 Status s = stream->Finish();
500 if (!AssertStatusOk(s, context.debug_error_string())) {
501 return false;
502 }
503
504 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
505 return true;
506 }
507
DoResponseStreaming()508 bool InteropClient::DoResponseStreaming() {
509 gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
510
511 ClientContext context;
512 StreamingOutputCallRequest request;
513 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
514 ResponseParameters* response_parameter = request.add_response_parameters();
515 response_parameter->set_size(response_stream_sizes[i]);
516 }
517 StreamingOutputCallResponse response;
518 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
519 serviceStub_.Get()->StreamingOutputCall(&context, request));
520
521 unsigned int i = 0;
522 while (stream->Read(&response)) {
523 GPR_ASSERT(response.payload().body() ==
524 std::string(response_stream_sizes[i], '\0'));
525 ++i;
526 }
527
528 if (i < response_stream_sizes.size()) {
529 // stream->Read() failed before reading all the expected messages. This is
530 // most likely due to connection failure.
531 gpr_log(GPR_ERROR,
532 "DoResponseStreaming(): Read fewer streams (%d) than "
533 "response_stream_sizes.size() (%" PRIuPTR ")",
534 i, response_stream_sizes.size());
535 return TransientFailureOrAbort();
536 }
537
538 Status s = stream->Finish();
539 if (!AssertStatusOk(s, context.debug_error_string())) {
540 return false;
541 }
542
543 gpr_log(GPR_DEBUG, "Response streaming done.");
544 return true;
545 }
546
DoClientCompressedStreaming()547 bool InteropClient::DoClientCompressedStreaming() {
548 // Probing for compression-checks support.
549 ClientContext probe_context;
550 StreamingInputCallRequest probe_req;
551 StreamingInputCallResponse probe_res;
552
553 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
554 probe_req.mutable_expect_compressed()->set_value(true); // lies!
555 probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
556
557 gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
558
559 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
560 serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
561
562 if (!probe_stream->Write(probe_req)) {
563 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
564 return TransientFailureOrAbort();
565 }
566 Status s = probe_stream->Finish();
567 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
568 // The server isn't able to evaluate incoming compression, making the rest
569 // of this test moot.
570 gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
571 return false;
572 }
573 gpr_log(GPR_DEBUG,
574 "Compressed streaming request probe succeeded. Proceeding.");
575
576 ClientContext context;
577 StreamingInputCallRequest request;
578 StreamingInputCallResponse response;
579
580 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
581 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
582 serviceStub_.Get()->StreamingInputCall(&context, &response));
583
584 request.mutable_payload()->set_body(std::string(27182, '\0'));
585 request.mutable_expect_compressed()->set_value(true);
586 gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
587 if (!stream->Write(request)) {
588 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
589 return TransientFailureOrAbort();
590 }
591
592 WriteOptions wopts;
593 wopts.set_no_compression();
594 request.mutable_payload()->set_body(std::string(45904, '\0'));
595 request.mutable_expect_compressed()->set_value(false);
596 gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
597 if (!stream->Write(request, wopts)) {
598 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
599 return TransientFailureOrAbort();
600 }
601 GPR_ASSERT(stream->WritesDone());
602
603 s = stream->Finish();
604 return AssertStatusOk(s, context.debug_error_string());
605 }
606
DoServerCompressedStreaming()607 bool InteropClient::DoServerCompressedStreaming() {
608 const std::vector<bool> compressions = {true, false};
609 const std::vector<int> sizes = {31415, 92653};
610
611 ClientContext context;
612 InteropClientContextInspector inspector(context);
613 StreamingOutputCallRequest request;
614
615 GPR_ASSERT(compressions.size() == sizes.size());
616 for (size_t i = 0; i < sizes.size(); i++) {
617 std::string log_suffix =
618 absl::StrFormat("(compression=%s; size=%d)",
619 compressions[i] ? "true" : "false", sizes[i]);
620
621 gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix.c_str());
622
623 ResponseParameters* const response_parameter =
624 request.add_response_parameters();
625 response_parameter->mutable_compressed()->set_value(compressions[i]);
626 response_parameter->set_size(sizes[i]);
627 }
628 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
629 serviceStub_.Get()->StreamingOutputCall(&context, request));
630
631 size_t k = 0;
632 StreamingOutputCallResponse response;
633 while (stream->Read(&response)) {
634 // Payload size checks.
635 GPR_ASSERT(response.payload().body() ==
636 std::string(request.response_parameters(k).size(), '\0'));
637
638 // Compression checks.
639 GPR_ASSERT(request.response_parameters(k).has_compressed());
640 if (request.response_parameters(k).compressed().value()) {
641 GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
642 GPR_ASSERT(inspector.WasCompressed());
643 } else {
644 // requested *no* compression.
645 GPR_ASSERT(!(inspector.WasCompressed()));
646 }
647 ++k;
648 }
649
650 if (k < sizes.size()) {
651 // stream->Read() failed before reading all the expected messages. This
652 // is most likely due to a connection failure.
653 gpr_log(GPR_ERROR,
654 "%s(): Responses read (k=%" PRIuPTR
655 ") is less than the expected number of messages (%" PRIuPTR ").",
656 __func__, k, sizes.size());
657 return TransientFailureOrAbort();
658 }
659
660 Status s = stream->Finish();
661 return AssertStatusOk(s, context.debug_error_string());
662 }
663
DoResponseStreamingWithSlowConsumer()664 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
665 gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
666
667 ClientContext context;
668 StreamingOutputCallRequest request;
669
670 for (int i = 0; i < kNumResponseMessages; ++i) {
671 ResponseParameters* response_parameter = request.add_response_parameters();
672 response_parameter->set_size(kResponseMessageSize);
673 }
674 StreamingOutputCallResponse response;
675 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
676 serviceStub_.Get()->StreamingOutputCall(&context, request));
677
678 int i = 0;
679 while (stream->Read(&response)) {
680 GPR_ASSERT(response.payload().body() ==
681 std::string(kResponseMessageSize, '\0'));
682 gpr_log(GPR_DEBUG, "received message %d", i);
683 gpr_sleep_until(gpr_time_add(
684 gpr_now(GPR_CLOCK_REALTIME),
685 gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
686 ++i;
687 }
688
689 if (i < kNumResponseMessages) {
690 gpr_log(GPR_ERROR,
691 "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
692 "less than the expected messages (i.e kNumResponseMessages = %d)",
693 i, kNumResponseMessages);
694
695 return TransientFailureOrAbort();
696 }
697
698 Status s = stream->Finish();
699 if (!AssertStatusOk(s, context.debug_error_string())) {
700 return false;
701 }
702
703 gpr_log(GPR_DEBUG, "Response streaming done.");
704 return true;
705 }
706
DoHalfDuplex()707 bool InteropClient::DoHalfDuplex() {
708 gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
709
710 ClientContext context;
711 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
712 StreamingOutputCallResponse>>
713 stream(serviceStub_.Get()->HalfDuplexCall(&context));
714
715 StreamingOutputCallRequest request;
716 ResponseParameters* response_parameter = request.add_response_parameters();
717 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
718 response_parameter->set_size(response_stream_sizes[i]);
719
720 if (!stream->Write(request)) {
721 gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
722 return TransientFailureOrAbort();
723 }
724 }
725 stream->WritesDone();
726
727 unsigned int i = 0;
728 StreamingOutputCallResponse response;
729 while (stream->Read(&response)) {
730 GPR_ASSERT(response.payload().body() ==
731 std::string(response_stream_sizes[i], '\0'));
732 ++i;
733 }
734
735 if (i < response_stream_sizes.size()) {
736 // stream->Read() failed before reading all the expected messages. This is
737 // most likely due to a connection failure
738 gpr_log(GPR_ERROR,
739 "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
740 "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
741 i, response_stream_sizes.size());
742 return TransientFailureOrAbort();
743 }
744
745 Status s = stream->Finish();
746 if (!AssertStatusOk(s, context.debug_error_string())) {
747 return false;
748 }
749
750 gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
751 return true;
752 }
753
DoPingPong()754 bool InteropClient::DoPingPong() {
755 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
756
757 ClientContext context;
758 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
759 StreamingOutputCallResponse>>
760 stream(serviceStub_.Get()->FullDuplexCall(&context));
761
762 StreamingOutputCallRequest request;
763 ResponseParameters* response_parameter = request.add_response_parameters();
764 Payload* payload = request.mutable_payload();
765 StreamingOutputCallResponse response;
766
767 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
768 response_parameter->set_size(response_stream_sizes[i]);
769 payload->set_body(std::string(request_stream_sizes[i], '\0'));
770
771 if (!stream->Write(request)) {
772 gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
773 return TransientFailureOrAbort();
774 }
775
776 if (!stream->Read(&response)) {
777 gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
778 return TransientFailureOrAbort();
779 }
780
781 GPR_ASSERT(response.payload().body() ==
782 std::string(response_stream_sizes[i], '\0'));
783 }
784
785 stream->WritesDone();
786
787 GPR_ASSERT(!stream->Read(&response));
788
789 Status s = stream->Finish();
790 if (!AssertStatusOk(s, context.debug_error_string())) {
791 return false;
792 }
793
794 gpr_log(GPR_DEBUG, "Ping pong streaming done.");
795 return true;
796 }
797
DoCancelAfterBegin()798 bool InteropClient::DoCancelAfterBegin() {
799 gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
800
801 ClientContext context;
802 StreamingInputCallRequest request;
803 StreamingInputCallResponse response;
804
805 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
806 serviceStub_.Get()->StreamingInputCall(&context, &response));
807
808 gpr_log(GPR_DEBUG, "Trying to cancel...");
809 context.TryCancel();
810 Status s = stream->Finish();
811
812 if (!AssertStatusCode(s, StatusCode::CANCELLED,
813 context.debug_error_string())) {
814 return false;
815 }
816
817 gpr_log(GPR_DEBUG, "Canceling streaming done.");
818 return true;
819 }
820
DoCancelAfterFirstResponse()821 bool InteropClient::DoCancelAfterFirstResponse() {
822 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
823
824 ClientContext context;
825 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
826 StreamingOutputCallResponse>>
827 stream(serviceStub_.Get()->FullDuplexCall(&context));
828
829 StreamingOutputCallRequest request;
830 ResponseParameters* response_parameter = request.add_response_parameters();
831 response_parameter->set_size(31415);
832 request.mutable_payload()->set_body(std::string(27182, '\0'));
833 StreamingOutputCallResponse response;
834
835 if (!stream->Write(request)) {
836 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
837 return TransientFailureOrAbort();
838 }
839
840 if (!stream->Read(&response)) {
841 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
842 return TransientFailureOrAbort();
843 }
844 GPR_ASSERT(response.payload().body() == std::string(31415, '\0'));
845
846 gpr_log(GPR_DEBUG, "Trying to cancel...");
847 context.TryCancel();
848
849 Status s = stream->Finish();
850 gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
851 return true;
852 }
853
DoTimeoutOnSleepingServer()854 bool InteropClient::DoTimeoutOnSleepingServer() {
855 gpr_log(GPR_DEBUG,
856 "Sending Ping Pong streaming rpc with a short deadline...");
857
858 ClientContext context;
859 std::chrono::system_clock::time_point deadline =
860 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
861 context.set_deadline(deadline);
862 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
863 StreamingOutputCallResponse>>
864 stream(serviceStub_.Get()->FullDuplexCall(&context));
865
866 StreamingOutputCallRequest request;
867 request.mutable_payload()->set_body(std::string(27182, '\0'));
868 stream->Write(request);
869
870 Status s = stream->Finish();
871 if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
872 context.debug_error_string())) {
873 return false;
874 }
875
876 gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
877 return true;
878 }
879
DoEmptyStream()880 bool InteropClient::DoEmptyStream() {
881 gpr_log(GPR_DEBUG, "Starting empty_stream.");
882
883 ClientContext context;
884 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
885 StreamingOutputCallResponse>>
886 stream(serviceStub_.Get()->FullDuplexCall(&context));
887 stream->WritesDone();
888 StreamingOutputCallResponse response;
889 GPR_ASSERT(stream->Read(&response) == false);
890
891 Status s = stream->Finish();
892 if (!AssertStatusOk(s, context.debug_error_string())) {
893 return false;
894 }
895
896 gpr_log(GPR_DEBUG, "empty_stream done.");
897 return true;
898 }
899
DoStatusWithMessage()900 bool InteropClient::DoStatusWithMessage() {
901 gpr_log(GPR_DEBUG,
902 "Sending RPC with a request for status code 2 and message");
903
904 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
905 const std::string test_msg = "This is a test message";
906
907 // Test UnaryCall.
908 ClientContext context;
909 SimpleRequest request;
910 SimpleResponse response;
911 EchoStatus* requested_status = request.mutable_response_status();
912 requested_status->set_code(test_code);
913 requested_status->set_message(test_msg);
914 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
915 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
916 context.debug_error_string())) {
917 return false;
918 }
919 GPR_ASSERT(s.error_message() == test_msg);
920
921 // Test FullDuplexCall.
922 ClientContext stream_context;
923 std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
924 StreamingOutputCallResponse>>
925 stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
926 StreamingOutputCallRequest streaming_request;
927 requested_status = streaming_request.mutable_response_status();
928 requested_status->set_code(test_code);
929 requested_status->set_message(test_msg);
930 stream->Write(streaming_request);
931 stream->WritesDone();
932 StreamingOutputCallResponse streaming_response;
933 while (stream->Read(&streaming_response)) {
934 }
935 s = stream->Finish();
936 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
937 context.debug_error_string())) {
938 return false;
939 }
940 GPR_ASSERT(s.error_message() == test_msg);
941
942 gpr_log(GPR_DEBUG, "Done testing Status and Message");
943 return true;
944 }
945
DoSpecialStatusMessage()946 bool InteropClient::DoSpecialStatusMessage() {
947 gpr_log(
948 GPR_DEBUG,
949 "Sending RPC with a request for status code 2 and message - \\t\\ntest "
950 "with whitespace\\r\\nand Unicode BMP ☺ and non-BMP \\t\\n");
951 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
952 const std::string test_msg =
953 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n";
954 ClientContext context;
955 SimpleRequest request;
956 SimpleResponse response;
957 EchoStatus* requested_status = request.mutable_response_status();
958 requested_status->set_code(test_code);
959 requested_status->set_message(test_msg);
960 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
961 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
962 context.debug_error_string())) {
963 return false;
964 }
965 GPR_ASSERT(s.error_message() == test_msg);
966 gpr_log(GPR_DEBUG, "Done testing Special Status Message");
967 return true;
968 }
969
DoPickFirstUnary()970 bool InteropClient::DoPickFirstUnary() {
971 const int rpcCount = 100;
972 SimpleRequest request;
973 SimpleResponse response;
974 std::string server_id;
975 request.set_fill_server_id(true);
976 for (int i = 0; i < rpcCount; i++) {
977 ClientContext context;
978 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
979 if (!AssertStatusOk(s, context.debug_error_string())) {
980 return false;
981 }
982 if (i == 0) {
983 server_id = response.server_id();
984 continue;
985 }
986 if (response.server_id() != server_id) {
987 gpr_log(GPR_ERROR, "#%d rpc hits server_id %s, expect server_id %s", i,
988 response.server_id().c_str(), server_id.c_str());
989 return false;
990 }
991 }
992 gpr_log(GPR_DEBUG, "pick first unary successfully finished");
993 return true;
994 }
995
DoOrcaPerRpc()996 bool InteropClient::DoOrcaPerRpc() {
997 load_report_tracker_.ResetCollectedLoadReports();
998 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
999 gpr_log(GPR_DEBUG, "testing orca per rpc");
1000 SimpleRequest request;
1001 SimpleResponse response;
1002 ClientContext context;
1003 auto orca_report = request.mutable_orca_per_query_report();
1004 orca_report->set_cpu_utilization(0.8210);
1005 orca_report->set_memory_utilization(0.5847);
1006 orca_report->mutable_request_cost()->emplace("cost", 3456.32);
1007 orca_report->mutable_utilization()->emplace("util", 0.30499);
1008 auto status = serviceStub_.Get()->UnaryCall(&context, request, &response);
1009 if (!AssertStatusOk(status, context.debug_error_string())) {
1010 return false;
1011 }
1012 auto report = load_report_tracker_.GetNextLoadReport();
1013 GPR_ASSERT(report.has_value());
1014 GPR_ASSERT(report->has_value());
1015 auto comparison_result = OrcaLoadReportsDiff(report->value(), *orca_report);
1016 if (comparison_result.has_value()) {
1017 gpr_assertion_failed(__FILE__, __LINE__, comparison_result->c_str());
1018 }
1019 GPR_ASSERT(!load_report_tracker_.GetNextLoadReport().has_value());
1020 gpr_log(GPR_DEBUG, "orca per rpc successfully finished");
1021 return true;
1022 }
1023
DoOrcaOob()1024 bool InteropClient::DoOrcaOob() {
1025 static constexpr auto kTimeout = absl::Seconds(10);
1026 gpr_log(GPR_INFO, "testing orca oob");
1027 load_report_tracker_.ResetCollectedLoadReports();
1028 // Make the backup poller poll very frequently in order to pick up
1029 // updates from all the subchannels's FDs.
1030 grpc_core::ConfigVars::Overrides overrides;
1031 overrides.client_channel_backup_poll_interval_ms = 250;
1032 grpc_core::ConfigVars::SetOverrides(overrides);
1033 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
1034 ClientContext context;
1035 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1036 StreamingOutputCallResponse>>
1037 stream(serviceStub_.Get()->FullDuplexCall(&context));
1038 auto stream_cleanup = absl::MakeCleanup([&]() {
1039 GPR_ASSERT(stream->WritesDone());
1040 GPR_ASSERT(stream->Finish().ok());
1041 });
1042 {
1043 StreamingOutputCallRequest request;
1044 request.add_response_parameters()->set_size(1);
1045 TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1046 orca_report->set_cpu_utilization(0.8210);
1047 orca_report->set_memory_utilization(0.5847);
1048 orca_report->mutable_utilization()->emplace("util", 0.30499);
1049 StreamingOutputCallResponse response;
1050 if (!stream->Write(request)) {
1051 gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
1052 return TransientFailureOrAbort();
1053 }
1054 if (!stream->Read(&response)) {
1055 gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
1056 return TransientFailureOrAbort();
1057 }
1058 GPR_ASSERT(load_report_tracker_
1059 .WaitForOobLoadReport(
1060 [orca_report](const auto& actual) {
1061 auto value = OrcaLoadReportsDiff(*orca_report, actual);
1062 if (value.has_value()) {
1063 gpr_log(GPR_DEBUG, "Reports mismatch: %s",
1064 value->c_str());
1065 return false;
1066 }
1067 return true;
1068 },
1069 kTimeout, 10)
1070 .has_value());
1071 }
1072 {
1073 StreamingOutputCallRequest request;
1074 request.add_response_parameters()->set_size(1);
1075 TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1076 orca_report->set_cpu_utilization(0.29309);
1077 orca_report->set_memory_utilization(0.2);
1078 orca_report->mutable_utilization()->emplace("util", 0.2039);
1079 StreamingOutputCallResponse response;
1080 if (!stream->Write(request)) {
1081 gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
1082 return TransientFailureOrAbort();
1083 }
1084 if (!stream->Read(&response)) {
1085 gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
1086 return TransientFailureOrAbort();
1087 }
1088 GPR_ASSERT(
1089 load_report_tracker_
1090 .WaitForOobLoadReport(
1091 [orca_report](const auto& report) {
1092 return !OrcaLoadReportsDiff(*orca_report, report).has_value();
1093 },
1094 kTimeout, 10)
1095 .has_value());
1096 }
1097 gpr_log(GPR_INFO, "orca oob successfully finished");
1098 return true;
1099 }
1100
DoCustomMetadata()1101 bool InteropClient::DoCustomMetadata() {
1102 const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
1103 const std::string kInitialMetadataValue("test_initial_metadata_value");
1104 const std::string kEchoTrailingBinMetadataKey(
1105 "x-grpc-test-echo-trailing-bin");
1106 const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
1107
1108 {
1109 gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
1110 ClientContext context;
1111 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1112 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1113 SimpleRequest request;
1114 SimpleResponse response;
1115 request.set_response_size(kLargeResponseSize);
1116 std::string payload(kLargeRequestSize, '\0');
1117 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1118
1119 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1120 if (!AssertStatusOk(s, context.debug_error_string())) {
1121 return false;
1122 }
1123
1124 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1125 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1126 GPR_ASSERT(iter != server_initial_metadata.end());
1127 GPR_ASSERT(iter->second == kInitialMetadataValue);
1128 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1129 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1130 GPR_ASSERT(iter != server_trailing_metadata.end());
1131 GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1132 kTrailingBinValue);
1133
1134 gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
1135 }
1136
1137 {
1138 gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
1139 ClientContext context;
1140 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1141 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1142 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1143 StreamingOutputCallResponse>>
1144 stream(serviceStub_.Get()->FullDuplexCall(&context));
1145
1146 StreamingOutputCallRequest request;
1147 ResponseParameters* response_parameter = request.add_response_parameters();
1148 response_parameter->set_size(kLargeResponseSize);
1149 std::string payload(kLargeRequestSize, '\0');
1150 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1151 StreamingOutputCallResponse response;
1152
1153 if (!stream->Write(request)) {
1154 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
1155 return TransientFailureOrAbort();
1156 }
1157
1158 stream->WritesDone();
1159
1160 if (!stream->Read(&response)) {
1161 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
1162 return TransientFailureOrAbort();
1163 }
1164
1165 GPR_ASSERT(response.payload().body() ==
1166 std::string(kLargeResponseSize, '\0'));
1167
1168 GPR_ASSERT(!stream->Read(&response));
1169
1170 Status s = stream->Finish();
1171 if (!AssertStatusOk(s, context.debug_error_string())) {
1172 return false;
1173 }
1174
1175 const auto& server_initial_metadata = context.GetServerInitialMetadata();
1176 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1177 GPR_ASSERT(iter != server_initial_metadata.end());
1178 GPR_ASSERT(iter->second == kInitialMetadataValue);
1179 const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1180 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1181 GPR_ASSERT(iter != server_trailing_metadata.end());
1182 GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1183 kTrailingBinValue);
1184
1185 gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
1186 }
1187
1188 return true;
1189 }
1190
1191 std::tuple<bool, int32_t, std::string, std::string>
PerformOneSoakTestIteration(const bool reset_channel,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t request_size,const int32_t response_size)1192 InteropClient::PerformOneSoakTestIteration(
1193 const bool reset_channel,
1194 const int32_t max_acceptable_per_iteration_latency_ms,
1195 const int32_t request_size, const int32_t response_size) {
1196 gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
1197 SimpleRequest request;
1198 SimpleResponse response;
1199 // Don't set the deadline on the RPC, and instead just
1200 // record how long the RPC took and compare. This makes
1201 // debugging easier when looking at failure results.
1202 ClientContext context;
1203 InteropClientContextInspector inspector(context);
1204 request.set_response_size(response_size);
1205 std::string payload(request_size, '\0');
1206 request.mutable_payload()->set_body(payload.c_str(), request_size);
1207 if (reset_channel) {
1208 serviceStub_.ResetChannel();
1209 }
1210 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1211 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1212 int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
1213 if (!s.ok()) {
1214 return std::make_tuple(false, elapsed_ms, context.debug_error_string(),
1215 context.peer());
1216 } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1217 std::string debug_string = absl::StrFormat(
1218 "%d ms exceeds max acceptable latency: %d ms, peer: %s", elapsed_ms,
1219 max_acceptable_per_iteration_latency_ms, context.peer());
1220 return std::make_tuple(false, elapsed_ms, std::move(debug_string),
1221 context.peer());
1222 } else {
1223 return std::make_tuple(true, elapsed_ms, "", context.peer());
1224 }
1225 }
1226
PerformSoakTest(const std::string & server_uri,const bool reset_channel_per_iteration,const int32_t soak_iterations,const int32_t max_failures,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t min_time_ms_between_rpcs,const int32_t overall_timeout_seconds,const int32_t request_size,const int32_t response_size)1227 void InteropClient::PerformSoakTest(
1228 const std::string& server_uri, const bool reset_channel_per_iteration,
1229 const int32_t soak_iterations, const int32_t max_failures,
1230 const int32_t max_acceptable_per_iteration_latency_ms,
1231 const int32_t min_time_ms_between_rpcs,
1232 const int32_t overall_timeout_seconds, const int32_t request_size,
1233 const int32_t response_size) {
1234 std::vector<std::tuple<bool, int32_t, std::string, std::string>> results;
1235 grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
1236 1 /* resolution */,
1237 500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
1238 gpr_timespec overall_deadline = gpr_time_add(
1239 gpr_now(GPR_CLOCK_MONOTONIC),
1240 gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
1241 int32_t iterations_ran = 0;
1242 int total_failures = 0;
1243 for (int i = 0;
1244 i < soak_iterations &&
1245 gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
1246 ++i) {
1247 gpr_timespec earliest_next_start = gpr_time_add(
1248 gpr_now(GPR_CLOCK_MONOTONIC),
1249 gpr_time_from_millis(min_time_ms_between_rpcs, GPR_TIMESPAN));
1250 auto result = PerformOneSoakTestIteration(
1251 reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms,
1252 request_size, response_size);
1253 bool success = std::get<0>(result);
1254 int32_t elapsed_ms = std::get<1>(result);
1255 std::string debug_string = std::get<2>(result);
1256 std::string peer = std::get<3>(result);
1257 results.push_back(result);
1258 if (!success) {
1259 gpr_log(GPR_INFO,
1260 "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s "
1261 "failed: %s",
1262 i, elapsed_ms, peer.c_str(), server_uri.c_str(),
1263 debug_string.c_str());
1264 total_failures++;
1265 } else {
1266 gpr_log(
1267 GPR_INFO,
1268 "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded",
1269 i, elapsed_ms, peer.c_str(), server_uri.c_str());
1270 }
1271 grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
1272 iterations_ran++;
1273 gpr_sleep_until(earliest_next_start);
1274 }
1275 double latency_ms_median =
1276 grpc_histogram_percentile(latencies_ms_histogram, 50);
1277 double latency_ms_90th =
1278 grpc_histogram_percentile(latencies_ms_histogram, 90);
1279 double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
1280 grpc_histogram_destroy(latencies_ms_histogram);
1281 if (iterations_ran < soak_iterations) {
1282 gpr_log(
1283 GPR_ERROR,
1284 "(server_uri: %s) soak test consumed all %d seconds of time and quit "
1285 "early, only "
1286 "having ran %d out of desired %d iterations. "
1287 "total_failures: %d. "
1288 "max_failures_threshold: %d. "
1289 "median_soak_iteration_latency: %lf ms. "
1290 "90th_soak_iteration_latency: %lf ms. "
1291 "worst_soak_iteration_latency: %lf ms. "
1292 "Some or all of the iterations that did run were unexpectedly slow. "
1293 "See breakdown above for which iterations succeeded, failed, and "
1294 "why for more info.",
1295 server_uri.c_str(), overall_timeout_seconds, iterations_ran,
1296 soak_iterations, total_failures, max_failures, latency_ms_median,
1297 latency_ms_90th, latency_ms_worst);
1298 GPR_ASSERT(0);
1299 } else if (total_failures > max_failures) {
1300 gpr_log(GPR_ERROR,
1301 "(server_uri: %s) soak test ran: %d iterations. total_failures: %d "
1302 "exceeds "
1303 "max_failures_threshold: %d. "
1304 "median_soak_iteration_latency: %lf ms. "
1305 "90th_soak_iteration_latency: %lf ms. "
1306 "worst_soak_iteration_latency: %lf ms. "
1307 "See breakdown above for which iterations succeeded, failed, and "
1308 "why for more info.",
1309 server_uri.c_str(), soak_iterations, total_failures, max_failures,
1310 latency_ms_median, latency_ms_90th, latency_ms_worst);
1311 GPR_ASSERT(0);
1312 } else {
1313 gpr_log(GPR_INFO,
1314 "(server_uri: %s) soak test ran: %d iterations. total_failures: %d "
1315 "is within "
1316 "max_failures_threshold: %d. "
1317 "median_soak_iteration_latency: %lf ms. "
1318 "90th_soak_iteration_latency: %lf ms. "
1319 "worst_soak_iteration_latency: %lf ms. "
1320 "See breakdown above for which iterations succeeded, failed, and "
1321 "why for more info.",
1322 server_uri.c_str(), soak_iterations, total_failures, max_failures,
1323 latency_ms_median, latency_ms_90th, latency_ms_worst);
1324 }
1325 }
1326
DoRpcSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1327 bool InteropClient::DoRpcSoakTest(
1328 const std::string& server_uri, int32_t soak_iterations,
1329 int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1330 int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1331 int32_t request_size, int32_t response_size) {
1332 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1333 GPR_ASSERT(soak_iterations > 0);
1334 PerformSoakTest(server_uri, false /* reset channel per iteration */,
1335 soak_iterations, max_failures,
1336 max_acceptable_per_iteration_latency_ms,
1337 soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1338 request_size, response_size);
1339 gpr_log(GPR_DEBUG, "rpc_soak test done.");
1340 return true;
1341 }
1342
DoChannelSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1343 bool InteropClient::DoChannelSoakTest(
1344 const std::string& server_uri, int32_t soak_iterations,
1345 int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1346 int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1347 int32_t request_size, int32_t response_size) {
1348 gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
1349 soak_iterations);
1350 GPR_ASSERT(soak_iterations > 0);
1351 PerformSoakTest(server_uri, true /* reset channel per iteration */,
1352 soak_iterations, max_failures,
1353 max_acceptable_per_iteration_latency_ms,
1354 soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1355 request_size, response_size);
1356 gpr_log(GPR_DEBUG, "channel_soak test done.");
1357 return true;
1358 }
1359
DoLongLivedChannelTest(int32_t soak_iterations,int32_t iteration_interval)1360 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
1361 int32_t iteration_interval) {
1362 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1363 GPR_ASSERT(soak_iterations > 0);
1364 GPR_ASSERT(iteration_interval > 0);
1365 SimpleRequest request;
1366 SimpleResponse response;
1367 int num_failures = 0;
1368 for (int i = 0; i < soak_iterations; ++i) {
1369 gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
1370 if (!PerformLargeUnary(&request, &response)) {
1371 gpr_log(GPR_ERROR, "Iteration %d failed.", i);
1372 num_failures++;
1373 }
1374 gpr_sleep_until(
1375 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1376 gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
1377 }
1378 if (num_failures == 0) {
1379 gpr_log(GPR_DEBUG, "long_lived_channel test done.");
1380 return true;
1381 } else {
1382 gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
1383 num_failures);
1384 return false;
1385 }
1386 }
1387
DoUnimplementedService()1388 bool InteropClient::DoUnimplementedService() {
1389 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
1390
1391 Empty request;
1392 Empty response;
1393 ClientContext context;
1394
1395 UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
1396
1397 Status s = stub->UnimplementedCall(&context, request, &response);
1398
1399 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1400 context.debug_error_string())) {
1401 return false;
1402 }
1403
1404 gpr_log(GPR_DEBUG, "unimplemented service done.");
1405 return true;
1406 }
1407
DoUnimplementedMethod()1408 bool InteropClient::DoUnimplementedMethod() {
1409 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
1410
1411 Empty request;
1412 Empty response;
1413 ClientContext context;
1414
1415 Status s =
1416 serviceStub_.Get()->UnimplementedCall(&context, request, &response);
1417
1418 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1419 context.debug_error_string())) {
1420 return false;
1421 }
1422
1423 gpr_log(GPR_DEBUG, "unimplemented rpc done.");
1424 return true;
1425 }
1426
1427 } // namespace testing
1428 } // namespace grpc
1429