xref: /aosp_15_r20/external/grpc-grpc/test/cpp/interop/interop_client.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015-2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/interop_client.h"
20 
21 #include <cinttypes>
22 #include <fstream>
23 #include <memory>
24 #include <string>
25 #include <type_traits>
26 #include <utility>
27 
28 #include "absl/cleanup/cleanup.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/types/optional.h"
33 
34 #include <grpc/grpc.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/time.h>
39 #include <grpcpp/channel.h>
40 #include <grpcpp/client_context.h>
41 #include <grpcpp/security/credentials.h>
42 
43 #include "src/core/lib/config/config_vars.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/proto/grpc/testing/empty.pb.h"
47 #include "src/proto/grpc/testing/messages.pb.h"
48 #include "src/proto/grpc/testing/test.grpc.pb.h"
49 #include "test/core/util/histogram.h"
50 #include "test/cpp/interop/backend_metrics_lb_policy.h"
51 #include "test/cpp/interop/client_helper.h"
52 
53 namespace grpc {
54 namespace testing {
55 
56 namespace {
57 // The same value is defined by the Java client.
58 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
59 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
60 const int kNumResponseMessages = 2000;
61 const int kResponseMessageSize = 1030;
62 const int kReceiveDelayMilliSeconds = 20;
63 const int kLargeRequestSize = 271828;
64 const int kLargeResponseSize = 314159;
65 
NoopChecks(const InteropClientContextInspector &,const SimpleRequest *,const SimpleResponse *)66 void NoopChecks(const InteropClientContextInspector& /*inspector*/,
67                 const SimpleRequest* /*request*/,
68                 const SimpleResponse* /*response*/) {}
69 
UnaryCompressionChecks(const InteropClientContextInspector & inspector,const SimpleRequest * request,const SimpleResponse *)70 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
71                             const SimpleRequest* request,
72                             const SimpleResponse* /*response*/) {
73   const grpc_compression_algorithm received_compression =
74       inspector.GetCallCompressionAlgorithm();
75   if (request->response_compressed().value()) {
76     if (received_compression == GRPC_COMPRESS_NONE) {
77       // Requested some compression, got NONE. This is an error.
78       grpc_core::Crash(
79           "Failure: Requested compression but got uncompressed response "
80           "from server.");
81     }
82     GPR_ASSERT(inspector.WasCompressed());
83   } else {
84     // Didn't request compression -> make sure the response is uncompressed
85     GPR_ASSERT(!(inspector.WasCompressed()));
86   }
87 }
88 
ValuesDiff(absl::string_view field,double expected,double actual)89 absl::optional<std::string> ValuesDiff(absl::string_view field, double expected,
90                                        double actual) {
91   if (expected != actual) {
92     return absl::StrFormat("%s: expected: %f, actual: %f", field, expected,
93                            actual);
94   }
95   return absl::nullopt;
96 }
97 
98 template <typename Map>
MapsDiff(absl::string_view path,const Map & expected,const Map & actual)99 absl::optional<std::string> MapsDiff(absl::string_view path,
100                                      const Map& expected, const Map& actual) {
101   auto result = ValuesDiff(absl::StrFormat("%s size", path), expected.size(),
102                            actual.size());
103   if (result.has_value()) {
104     return result;
105   }
106   for (const auto& key_value : expected) {
107     auto it = actual.find(key_value.first);
108     if (it == actual.end()) {
109       return absl::StrFormat("In field %s, key %s was not found", path,
110                              key_value.first);
111     }
112     result = ValuesDiff(absl::StrFormat("%s/%s", path, key_value.first),
113                         key_value.second, it->second);
114     if (result.has_value()) {
115       return result;
116     }
117   }
118   return absl::nullopt;
119 }
120 
OrcaLoadReportsDiff(const TestOrcaReport & expected,const TestOrcaReport & actual)121 absl::optional<std::string> OrcaLoadReportsDiff(const TestOrcaReport& expected,
122                                                 const TestOrcaReport& actual) {
123   auto error = ValuesDiff("cpu_utilization", expected.cpu_utilization(),
124                           actual.cpu_utilization());
125   if (error.has_value()) {
126     return error;
127   }
128   error = ValuesDiff("mem_utilization", expected.memory_utilization(),
129                      actual.memory_utilization());
130   if (error.has_value()) {
131     return error;
132   }
133   error =
134       MapsDiff("request_cost", expected.request_cost(), actual.request_cost());
135   if (error.has_value()) {
136     return error;
137   }
138   error = MapsDiff("utilization", expected.utilization(), actual.utilization());
139   if (error.has_value()) {
140     return error;
141   }
142   return absl::nullopt;
143 }
144 }  // namespace
145 
ServiceStub(ChannelCreationFunc channel_creation_func,bool new_stub_every_call)146 InteropClient::ServiceStub::ServiceStub(
147     ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
148     : channel_creation_func_(std::move(channel_creation_func)),
149       new_stub_every_call_(new_stub_every_call) {}
150 
Get()151 TestService::Stub* InteropClient::ServiceStub::Get() {
152   if (new_stub_every_call_ || stub_ == nullptr) {
153     if (channel_ == nullptr) {
154       channel_ = channel_creation_func_();
155     }
156     stub_ = TestService::NewStub(channel_);
157   }
158   return stub_.get();
159 }
160 
161 UnimplementedService::Stub*
GetUnimplementedServiceStub()162 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
163   if (unimplemented_service_stub_ == nullptr) {
164     if (channel_ == nullptr) {
165       channel_ = channel_creation_func_();
166     }
167     unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
168   }
169   return unimplemented_service_stub_.get();
170 }
171 
ResetChannel()172 void InteropClient::ServiceStub::ResetChannel() {
173   channel_.reset();
174   stub_.reset();
175 }
176 
InteropClient(ChannelCreationFunc channel_creation_func,bool new_stub_every_test_case,bool do_not_abort_on_transient_failures)177 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
178                              bool new_stub_every_test_case,
179                              bool do_not_abort_on_transient_failures)
180     : serviceStub_(
181           [channel_creation_func = std::move(channel_creation_func), this]() {
182             return channel_creation_func(
183                 load_report_tracker_.GetChannelArguments());
184           },
185           new_stub_every_test_case),
186       do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
187 
AssertStatusOk(const Status & s,const std::string & optional_debug_string)188 bool InteropClient::AssertStatusOk(const Status& s,
189                                    const std::string& optional_debug_string) {
190   if (s.ok()) {
191     return true;
192   }
193 
194   // Note: At this point, s.error_code is definitely not StatusCode::OK (we
195   // already checked for s.ok() above). So, the following will call abort()
196   // (unless s.error_code() corresponds to a transient failure and
197   // 'do_not_abort_on_transient_failures' is true)
198   return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
199 }
200 
AssertStatusCode(const Status & s,StatusCode expected_code,const std::string & optional_debug_string)201 bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
202                                      const std::string& optional_debug_string) {
203   if (s.error_code() == expected_code) {
204     return true;
205   }
206 
207   gpr_log(GPR_ERROR,
208           "Error status code: %d (expected: %d), message: %s,"
209           " debug string: %s",
210           s.error_code(), expected_code, s.error_message().c_str(),
211           optional_debug_string.c_str());
212 
213   // In case of transient transient/retryable failures (like a broken
214   // connection) we may or may not abort (see TransientFailureOrAbort())
215   if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
216     return TransientFailureOrAbort();
217   }
218 
219   abort();
220 }
221 
DoEmpty()222 bool InteropClient::DoEmpty() {
223   gpr_log(GPR_DEBUG, "Sending an empty rpc...");
224 
225   Empty request;
226   Empty response;
227   ClientContext context;
228 
229   Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
230 
231   if (!AssertStatusOk(s, context.debug_error_string())) {
232     return false;
233   }
234 
235   gpr_log(GPR_DEBUG, "Empty rpc done.");
236   return true;
237 }
238 
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response)239 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
240                                       SimpleResponse* response) {
241   return PerformLargeUnary(request, response, NoopChecks);
242 }
243 
PerformLargeUnary(SimpleRequest * request,SimpleResponse * response,const CheckerFn & custom_checks_fn)244 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
245                                       SimpleResponse* response,
246                                       const CheckerFn& custom_checks_fn) {
247   ClientContext context;
248   InteropClientContextInspector inspector(context);
249   request->set_response_size(kLargeResponseSize);
250   std::string payload(kLargeRequestSize, '\0');
251   request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
252   if (request->has_expect_compressed()) {
253     if (request->expect_compressed().value()) {
254       context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
255     } else {
256       context.set_compression_algorithm(GRPC_COMPRESS_NONE);
257     }
258   }
259 
260   Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
261   if (!AssertStatusOk(s, context.debug_error_string())) {
262     return false;
263   }
264 
265   custom_checks_fn(inspector, request, response);
266 
267   // Payload related checks.
268   GPR_ASSERT(response->payload().body() ==
269              std::string(kLargeResponseSize, '\0'));
270   return true;
271 }
272 
DoComputeEngineCreds(const std::string & default_service_account,const std::string & oauth_scope)273 bool InteropClient::DoComputeEngineCreds(
274     const std::string& default_service_account,
275     const std::string& oauth_scope) {
276   gpr_log(GPR_DEBUG,
277           "Sending a large unary rpc with compute engine credentials ...");
278   SimpleRequest request;
279   SimpleResponse response;
280   request.set_fill_username(true);
281   request.set_fill_oauth_scope(true);
282 
283   if (!PerformLargeUnary(&request, &response)) {
284     return false;
285   }
286 
287   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
288   gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
289   GPR_ASSERT(!response.username().empty());
290   GPR_ASSERT(response.username() == default_service_account);
291   GPR_ASSERT(!response.oauth_scope().empty());
292   const char* oauth_scope_str = response.oauth_scope().c_str();
293   GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
294   gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
295   return true;
296 }
297 
DoOauth2AuthToken(const std::string & username,const std::string & oauth_scope)298 bool InteropClient::DoOauth2AuthToken(const std::string& username,
299                                       const std::string& oauth_scope) {
300   gpr_log(GPR_DEBUG,
301           "Sending a unary rpc with raw oauth2 access token credentials ...");
302   SimpleRequest request;
303   SimpleResponse response;
304   request.set_fill_username(true);
305   request.set_fill_oauth_scope(true);
306 
307   ClientContext context;
308 
309   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
310 
311   if (!AssertStatusOk(s, context.debug_error_string())) {
312     return false;
313   }
314 
315   GPR_ASSERT(!response.username().empty());
316   GPR_ASSERT(!response.oauth_scope().empty());
317   GPR_ASSERT(username == response.username());
318   const char* oauth_scope_str = response.oauth_scope().c_str();
319   GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
320   gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
321   return true;
322 }
323 
DoPerRpcCreds(const std::string & json_key)324 bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
325   gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
326   SimpleRequest request;
327   SimpleResponse response;
328   request.set_fill_username(true);
329 
330   ClientContext context;
331   std::chrono::seconds token_lifetime = std::chrono::hours(1);
332   std::shared_ptr<CallCredentials> creds =
333       ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
334 
335   context.set_credentials(creds);
336 
337   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
338 
339   if (!AssertStatusOk(s, context.debug_error_string())) {
340     return false;
341   }
342 
343   GPR_ASSERT(!response.username().empty());
344   GPR_ASSERT(json_key.find(response.username()) != std::string::npos);
345   gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
346   return true;
347 }
348 
DoJwtTokenCreds(const std::string & username)349 bool InteropClient::DoJwtTokenCreds(const std::string& username) {
350   gpr_log(GPR_DEBUG,
351           "Sending a large unary rpc with JWT token credentials ...");
352   SimpleRequest request;
353   SimpleResponse response;
354   request.set_fill_username(true);
355 
356   if (!PerformLargeUnary(&request, &response)) {
357     return false;
358   }
359 
360   GPR_ASSERT(!response.username().empty());
361   GPR_ASSERT(username.find(response.username()) != std::string::npos);
362   gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
363   return true;
364 }
365 
DoGoogleDefaultCredentials(const std::string & default_service_account)366 bool InteropClient::DoGoogleDefaultCredentials(
367     const std::string& default_service_account) {
368   gpr_log(GPR_DEBUG,
369           "Sending a large unary rpc with GoogleDefaultCredentials...");
370   SimpleRequest request;
371   SimpleResponse response;
372   request.set_fill_username(true);
373 
374   if (!PerformLargeUnary(&request, &response)) {
375     return false;
376   }
377 
378   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
379   GPR_ASSERT(!response.username().empty());
380   GPR_ASSERT(response.username() == default_service_account);
381   gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
382   return true;
383 }
384 
DoLargeUnary()385 bool InteropClient::DoLargeUnary() {
386   gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
387   SimpleRequest request;
388   SimpleResponse response;
389   if (!PerformLargeUnary(&request, &response)) {
390     return false;
391   }
392   gpr_log(GPR_DEBUG, "Large unary done.");
393   return true;
394 }
395 
DoClientCompressedUnary()396 bool InteropClient::DoClientCompressedUnary() {
397   // Probing for compression-checks support.
398   ClientContext probe_context;
399   SimpleRequest probe_req;
400   SimpleResponse probe_res;
401 
402   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
403   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
404 
405   probe_req.set_response_size(kLargeResponseSize);
406   probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
407 
408   gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
409   const Status s =
410       serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
411   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
412     // The server isn't able to evaluate incoming compression, making the rest
413     // of this test moot.
414     gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
415     return false;
416   }
417   gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
418 
419   const std::vector<bool> compressions = {true, false};
420   for (size_t i = 0; i < compressions.size(); i++) {
421     std::string log_suffix =
422         absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
423 
424     gpr_log(GPR_DEBUG, "Sending compressed unary request %s.",
425             log_suffix.c_str());
426     SimpleRequest request;
427     SimpleResponse response;
428     request.mutable_expect_compressed()->set_value(compressions[i]);
429     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
430       gpr_log(GPR_ERROR, "Compressed unary request failed %s",
431               log_suffix.c_str());
432       return false;
433     }
434 
435     gpr_log(GPR_DEBUG, "Compressed unary request failed %s",
436             log_suffix.c_str());
437   }
438 
439   return true;
440 }
441 
DoServerCompressedUnary()442 bool InteropClient::DoServerCompressedUnary() {
443   const std::vector<bool> compressions = {true, false};
444   for (size_t i = 0; i < compressions.size(); i++) {
445     std::string log_suffix =
446         absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
447 
448     gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
449             log_suffix.c_str());
450     SimpleRequest request;
451     SimpleResponse response;
452     request.mutable_response_compressed()->set_value(compressions[i]);
453 
454     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
455       gpr_log(GPR_ERROR, "Request for compressed unary failed %s",
456               log_suffix.c_str());
457       return false;
458     }
459 
460     gpr_log(GPR_DEBUG, "Request for compressed unary failed %s",
461             log_suffix.c_str());
462   }
463 
464   return true;
465 }
466 
467 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
468 // false
TransientFailureOrAbort()469 bool InteropClient::TransientFailureOrAbort() {
470   if (do_not_abort_on_transient_failures_) {
471     return false;
472   }
473 
474   abort();
475 }
476 
DoRequestStreaming()477 bool InteropClient::DoRequestStreaming() {
478   gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
479 
480   ClientContext context;
481   StreamingInputCallRequest request;
482   StreamingInputCallResponse response;
483 
484   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
485       serviceStub_.Get()->StreamingInputCall(&context, &response));
486 
487   int aggregated_payload_size = 0;
488   for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
489     Payload* payload = request.mutable_payload();
490     payload->set_body(std::string(request_stream_sizes[i], '\0'));
491     if (!stream->Write(request)) {
492       gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
493       return TransientFailureOrAbort();
494     }
495     aggregated_payload_size += request_stream_sizes[i];
496   }
497   GPR_ASSERT(stream->WritesDone());
498 
499   Status s = stream->Finish();
500   if (!AssertStatusOk(s, context.debug_error_string())) {
501     return false;
502   }
503 
504   GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
505   return true;
506 }
507 
DoResponseStreaming()508 bool InteropClient::DoResponseStreaming() {
509   gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
510 
511   ClientContext context;
512   StreamingOutputCallRequest request;
513   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
514     ResponseParameters* response_parameter = request.add_response_parameters();
515     response_parameter->set_size(response_stream_sizes[i]);
516   }
517   StreamingOutputCallResponse response;
518   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
519       serviceStub_.Get()->StreamingOutputCall(&context, request));
520 
521   unsigned int i = 0;
522   while (stream->Read(&response)) {
523     GPR_ASSERT(response.payload().body() ==
524                std::string(response_stream_sizes[i], '\0'));
525     ++i;
526   }
527 
528   if (i < response_stream_sizes.size()) {
529     // stream->Read() failed before reading all the expected messages. This is
530     // most likely due to connection failure.
531     gpr_log(GPR_ERROR,
532             "DoResponseStreaming(): Read fewer streams (%d) than "
533             "response_stream_sizes.size() (%" PRIuPTR ")",
534             i, response_stream_sizes.size());
535     return TransientFailureOrAbort();
536   }
537 
538   Status s = stream->Finish();
539   if (!AssertStatusOk(s, context.debug_error_string())) {
540     return false;
541   }
542 
543   gpr_log(GPR_DEBUG, "Response streaming done.");
544   return true;
545 }
546 
DoClientCompressedStreaming()547 bool InteropClient::DoClientCompressedStreaming() {
548   // Probing for compression-checks support.
549   ClientContext probe_context;
550   StreamingInputCallRequest probe_req;
551   StreamingInputCallResponse probe_res;
552 
553   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
554   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
555   probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
556 
557   gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
558 
559   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
560       serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
561 
562   if (!probe_stream->Write(probe_req)) {
563     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
564     return TransientFailureOrAbort();
565   }
566   Status s = probe_stream->Finish();
567   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
568     // The server isn't able to evaluate incoming compression, making the rest
569     // of this test moot.
570     gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
571     return false;
572   }
573   gpr_log(GPR_DEBUG,
574           "Compressed streaming request probe succeeded. Proceeding.");
575 
576   ClientContext context;
577   StreamingInputCallRequest request;
578   StreamingInputCallResponse response;
579 
580   context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
581   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
582       serviceStub_.Get()->StreamingInputCall(&context, &response));
583 
584   request.mutable_payload()->set_body(std::string(27182, '\0'));
585   request.mutable_expect_compressed()->set_value(true);
586   gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
587   if (!stream->Write(request)) {
588     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
589     return TransientFailureOrAbort();
590   }
591 
592   WriteOptions wopts;
593   wopts.set_no_compression();
594   request.mutable_payload()->set_body(std::string(45904, '\0'));
595   request.mutable_expect_compressed()->set_value(false);
596   gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
597   if (!stream->Write(request, wopts)) {
598     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
599     return TransientFailureOrAbort();
600   }
601   GPR_ASSERT(stream->WritesDone());
602 
603   s = stream->Finish();
604   return AssertStatusOk(s, context.debug_error_string());
605 }
606 
DoServerCompressedStreaming()607 bool InteropClient::DoServerCompressedStreaming() {
608   const std::vector<bool> compressions = {true, false};
609   const std::vector<int> sizes = {31415, 92653};
610 
611   ClientContext context;
612   InteropClientContextInspector inspector(context);
613   StreamingOutputCallRequest request;
614 
615   GPR_ASSERT(compressions.size() == sizes.size());
616   for (size_t i = 0; i < sizes.size(); i++) {
617     std::string log_suffix =
618         absl::StrFormat("(compression=%s; size=%d)",
619                         compressions[i] ? "true" : "false", sizes[i]);
620 
621     gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix.c_str());
622 
623     ResponseParameters* const response_parameter =
624         request.add_response_parameters();
625     response_parameter->mutable_compressed()->set_value(compressions[i]);
626     response_parameter->set_size(sizes[i]);
627   }
628   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
629       serviceStub_.Get()->StreamingOutputCall(&context, request));
630 
631   size_t k = 0;
632   StreamingOutputCallResponse response;
633   while (stream->Read(&response)) {
634     // Payload size checks.
635     GPR_ASSERT(response.payload().body() ==
636                std::string(request.response_parameters(k).size(), '\0'));
637 
638     // Compression checks.
639     GPR_ASSERT(request.response_parameters(k).has_compressed());
640     if (request.response_parameters(k).compressed().value()) {
641       GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
642       GPR_ASSERT(inspector.WasCompressed());
643     } else {
644       // requested *no* compression.
645       GPR_ASSERT(!(inspector.WasCompressed()));
646     }
647     ++k;
648   }
649 
650   if (k < sizes.size()) {
651     // stream->Read() failed before reading all the expected messages. This
652     // is most likely due to a connection failure.
653     gpr_log(GPR_ERROR,
654             "%s(): Responses read (k=%" PRIuPTR
655             ") is less than the expected number of  messages (%" PRIuPTR ").",
656             __func__, k, sizes.size());
657     return TransientFailureOrAbort();
658   }
659 
660   Status s = stream->Finish();
661   return AssertStatusOk(s, context.debug_error_string());
662 }
663 
DoResponseStreamingWithSlowConsumer()664 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
665   gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
666 
667   ClientContext context;
668   StreamingOutputCallRequest request;
669 
670   for (int i = 0; i < kNumResponseMessages; ++i) {
671     ResponseParameters* response_parameter = request.add_response_parameters();
672     response_parameter->set_size(kResponseMessageSize);
673   }
674   StreamingOutputCallResponse response;
675   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
676       serviceStub_.Get()->StreamingOutputCall(&context, request));
677 
678   int i = 0;
679   while (stream->Read(&response)) {
680     GPR_ASSERT(response.payload().body() ==
681                std::string(kResponseMessageSize, '\0'));
682     gpr_log(GPR_DEBUG, "received message %d", i);
683     gpr_sleep_until(gpr_time_add(
684         gpr_now(GPR_CLOCK_REALTIME),
685         gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
686     ++i;
687   }
688 
689   if (i < kNumResponseMessages) {
690     gpr_log(GPR_ERROR,
691             "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
692             "less than the expected messages (i.e kNumResponseMessages = %d)",
693             i, kNumResponseMessages);
694 
695     return TransientFailureOrAbort();
696   }
697 
698   Status s = stream->Finish();
699   if (!AssertStatusOk(s, context.debug_error_string())) {
700     return false;
701   }
702 
703   gpr_log(GPR_DEBUG, "Response streaming done.");
704   return true;
705 }
706 
DoHalfDuplex()707 bool InteropClient::DoHalfDuplex() {
708   gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
709 
710   ClientContext context;
711   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
712                                      StreamingOutputCallResponse>>
713       stream(serviceStub_.Get()->HalfDuplexCall(&context));
714 
715   StreamingOutputCallRequest request;
716   ResponseParameters* response_parameter = request.add_response_parameters();
717   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
718     response_parameter->set_size(response_stream_sizes[i]);
719 
720     if (!stream->Write(request)) {
721       gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
722       return TransientFailureOrAbort();
723     }
724   }
725   stream->WritesDone();
726 
727   unsigned int i = 0;
728   StreamingOutputCallResponse response;
729   while (stream->Read(&response)) {
730     GPR_ASSERT(response.payload().body() ==
731                std::string(response_stream_sizes[i], '\0'));
732     ++i;
733   }
734 
735   if (i < response_stream_sizes.size()) {
736     // stream->Read() failed before reading all the expected messages. This is
737     // most likely due to a connection failure
738     gpr_log(GPR_ERROR,
739             "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
740             "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
741             i, response_stream_sizes.size());
742     return TransientFailureOrAbort();
743   }
744 
745   Status s = stream->Finish();
746   if (!AssertStatusOk(s, context.debug_error_string())) {
747     return false;
748   }
749 
750   gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
751   return true;
752 }
753 
DoPingPong()754 bool InteropClient::DoPingPong() {
755   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
756 
757   ClientContext context;
758   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
759                                      StreamingOutputCallResponse>>
760       stream(serviceStub_.Get()->FullDuplexCall(&context));
761 
762   StreamingOutputCallRequest request;
763   ResponseParameters* response_parameter = request.add_response_parameters();
764   Payload* payload = request.mutable_payload();
765   StreamingOutputCallResponse response;
766 
767   for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
768     response_parameter->set_size(response_stream_sizes[i]);
769     payload->set_body(std::string(request_stream_sizes[i], '\0'));
770 
771     if (!stream->Write(request)) {
772       gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
773       return TransientFailureOrAbort();
774     }
775 
776     if (!stream->Read(&response)) {
777       gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
778       return TransientFailureOrAbort();
779     }
780 
781     GPR_ASSERT(response.payload().body() ==
782                std::string(response_stream_sizes[i], '\0'));
783   }
784 
785   stream->WritesDone();
786 
787   GPR_ASSERT(!stream->Read(&response));
788 
789   Status s = stream->Finish();
790   if (!AssertStatusOk(s, context.debug_error_string())) {
791     return false;
792   }
793 
794   gpr_log(GPR_DEBUG, "Ping pong streaming done.");
795   return true;
796 }
797 
DoCancelAfterBegin()798 bool InteropClient::DoCancelAfterBegin() {
799   gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
800 
801   ClientContext context;
802   StreamingInputCallRequest request;
803   StreamingInputCallResponse response;
804 
805   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
806       serviceStub_.Get()->StreamingInputCall(&context, &response));
807 
808   gpr_log(GPR_DEBUG, "Trying to cancel...");
809   context.TryCancel();
810   Status s = stream->Finish();
811 
812   if (!AssertStatusCode(s, StatusCode::CANCELLED,
813                         context.debug_error_string())) {
814     return false;
815   }
816 
817   gpr_log(GPR_DEBUG, "Canceling streaming done.");
818   return true;
819 }
820 
DoCancelAfterFirstResponse()821 bool InteropClient::DoCancelAfterFirstResponse() {
822   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
823 
824   ClientContext context;
825   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
826                                      StreamingOutputCallResponse>>
827       stream(serviceStub_.Get()->FullDuplexCall(&context));
828 
829   StreamingOutputCallRequest request;
830   ResponseParameters* response_parameter = request.add_response_parameters();
831   response_parameter->set_size(31415);
832   request.mutable_payload()->set_body(std::string(27182, '\0'));
833   StreamingOutputCallResponse response;
834 
835   if (!stream->Write(request)) {
836     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
837     return TransientFailureOrAbort();
838   }
839 
840   if (!stream->Read(&response)) {
841     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
842     return TransientFailureOrAbort();
843   }
844   GPR_ASSERT(response.payload().body() == std::string(31415, '\0'));
845 
846   gpr_log(GPR_DEBUG, "Trying to cancel...");
847   context.TryCancel();
848 
849   Status s = stream->Finish();
850   gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
851   return true;
852 }
853 
DoTimeoutOnSleepingServer()854 bool InteropClient::DoTimeoutOnSleepingServer() {
855   gpr_log(GPR_DEBUG,
856           "Sending Ping Pong streaming rpc with a short deadline...");
857 
858   ClientContext context;
859   std::chrono::system_clock::time_point deadline =
860       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
861   context.set_deadline(deadline);
862   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
863                                      StreamingOutputCallResponse>>
864       stream(serviceStub_.Get()->FullDuplexCall(&context));
865 
866   StreamingOutputCallRequest request;
867   request.mutable_payload()->set_body(std::string(27182, '\0'));
868   stream->Write(request);
869 
870   Status s = stream->Finish();
871   if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
872                         context.debug_error_string())) {
873     return false;
874   }
875 
876   gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
877   return true;
878 }
879 
DoEmptyStream()880 bool InteropClient::DoEmptyStream() {
881   gpr_log(GPR_DEBUG, "Starting empty_stream.");
882 
883   ClientContext context;
884   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
885                                      StreamingOutputCallResponse>>
886       stream(serviceStub_.Get()->FullDuplexCall(&context));
887   stream->WritesDone();
888   StreamingOutputCallResponse response;
889   GPR_ASSERT(stream->Read(&response) == false);
890 
891   Status s = stream->Finish();
892   if (!AssertStatusOk(s, context.debug_error_string())) {
893     return false;
894   }
895 
896   gpr_log(GPR_DEBUG, "empty_stream done.");
897   return true;
898 }
899 
DoStatusWithMessage()900 bool InteropClient::DoStatusWithMessage() {
901   gpr_log(GPR_DEBUG,
902           "Sending RPC with a request for status code 2 and message");
903 
904   const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
905   const std::string test_msg = "This is a test message";
906 
907   // Test UnaryCall.
908   ClientContext context;
909   SimpleRequest request;
910   SimpleResponse response;
911   EchoStatus* requested_status = request.mutable_response_status();
912   requested_status->set_code(test_code);
913   requested_status->set_message(test_msg);
914   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
915   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
916                         context.debug_error_string())) {
917     return false;
918   }
919   GPR_ASSERT(s.error_message() == test_msg);
920 
921   // Test FullDuplexCall.
922   ClientContext stream_context;
923   std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
924                                      StreamingOutputCallResponse>>
925       stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
926   StreamingOutputCallRequest streaming_request;
927   requested_status = streaming_request.mutable_response_status();
928   requested_status->set_code(test_code);
929   requested_status->set_message(test_msg);
930   stream->Write(streaming_request);
931   stream->WritesDone();
932   StreamingOutputCallResponse streaming_response;
933   while (stream->Read(&streaming_response)) {
934   }
935   s = stream->Finish();
936   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
937                         context.debug_error_string())) {
938     return false;
939   }
940   GPR_ASSERT(s.error_message() == test_msg);
941 
942   gpr_log(GPR_DEBUG, "Done testing Status and Message");
943   return true;
944 }
945 
DoSpecialStatusMessage()946 bool InteropClient::DoSpecialStatusMessage() {
947   gpr_log(
948       GPR_DEBUG,
949       "Sending RPC with a request for status code 2 and message - \\t\\ntest "
950       "with whitespace\\r\\nand Unicode BMP ☺ and non-BMP ��\\t\\n");
951   const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
952   const std::string test_msg =
953       "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP ��\t\n";
954   ClientContext context;
955   SimpleRequest request;
956   SimpleResponse response;
957   EchoStatus* requested_status = request.mutable_response_status();
958   requested_status->set_code(test_code);
959   requested_status->set_message(test_msg);
960   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
961   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
962                         context.debug_error_string())) {
963     return false;
964   }
965   GPR_ASSERT(s.error_message() == test_msg);
966   gpr_log(GPR_DEBUG, "Done testing Special Status Message");
967   return true;
968 }
969 
DoPickFirstUnary()970 bool InteropClient::DoPickFirstUnary() {
971   const int rpcCount = 100;
972   SimpleRequest request;
973   SimpleResponse response;
974   std::string server_id;
975   request.set_fill_server_id(true);
976   for (int i = 0; i < rpcCount; i++) {
977     ClientContext context;
978     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
979     if (!AssertStatusOk(s, context.debug_error_string())) {
980       return false;
981     }
982     if (i == 0) {
983       server_id = response.server_id();
984       continue;
985     }
986     if (response.server_id() != server_id) {
987       gpr_log(GPR_ERROR, "#%d rpc hits server_id %s, expect server_id %s", i,
988               response.server_id().c_str(), server_id.c_str());
989       return false;
990     }
991   }
992   gpr_log(GPR_DEBUG, "pick first unary successfully finished");
993   return true;
994 }
995 
DoOrcaPerRpc()996 bool InteropClient::DoOrcaPerRpc() {
997   load_report_tracker_.ResetCollectedLoadReports();
998   grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
999   gpr_log(GPR_DEBUG, "testing orca per rpc");
1000   SimpleRequest request;
1001   SimpleResponse response;
1002   ClientContext context;
1003   auto orca_report = request.mutable_orca_per_query_report();
1004   orca_report->set_cpu_utilization(0.8210);
1005   orca_report->set_memory_utilization(0.5847);
1006   orca_report->mutable_request_cost()->emplace("cost", 3456.32);
1007   orca_report->mutable_utilization()->emplace("util", 0.30499);
1008   auto status = serviceStub_.Get()->UnaryCall(&context, request, &response);
1009   if (!AssertStatusOk(status, context.debug_error_string())) {
1010     return false;
1011   }
1012   auto report = load_report_tracker_.GetNextLoadReport();
1013   GPR_ASSERT(report.has_value());
1014   GPR_ASSERT(report->has_value());
1015   auto comparison_result = OrcaLoadReportsDiff(report->value(), *orca_report);
1016   if (comparison_result.has_value()) {
1017     gpr_assertion_failed(__FILE__, __LINE__, comparison_result->c_str());
1018   }
1019   GPR_ASSERT(!load_report_tracker_.GetNextLoadReport().has_value());
1020   gpr_log(GPR_DEBUG, "orca per rpc successfully finished");
1021   return true;
1022 }
1023 
DoOrcaOob()1024 bool InteropClient::DoOrcaOob() {
1025   static constexpr auto kTimeout = absl::Seconds(10);
1026   gpr_log(GPR_INFO, "testing orca oob");
1027   load_report_tracker_.ResetCollectedLoadReports();
1028   // Make the backup poller poll very frequently in order to pick up
1029   // updates from all the subchannels's FDs.
1030   grpc_core::ConfigVars::Overrides overrides;
1031   overrides.client_channel_backup_poll_interval_ms = 250;
1032   grpc_core::ConfigVars::SetOverrides(overrides);
1033   grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
1034   ClientContext context;
1035   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1036                                      StreamingOutputCallResponse>>
1037       stream(serviceStub_.Get()->FullDuplexCall(&context));
1038   auto stream_cleanup = absl::MakeCleanup([&]() {
1039     GPR_ASSERT(stream->WritesDone());
1040     GPR_ASSERT(stream->Finish().ok());
1041   });
1042   {
1043     StreamingOutputCallRequest request;
1044     request.add_response_parameters()->set_size(1);
1045     TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1046     orca_report->set_cpu_utilization(0.8210);
1047     orca_report->set_memory_utilization(0.5847);
1048     orca_report->mutable_utilization()->emplace("util", 0.30499);
1049     StreamingOutputCallResponse response;
1050     if (!stream->Write(request)) {
1051       gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
1052       return TransientFailureOrAbort();
1053     }
1054     if (!stream->Read(&response)) {
1055       gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
1056       return TransientFailureOrAbort();
1057     }
1058     GPR_ASSERT(load_report_tracker_
1059                    .WaitForOobLoadReport(
1060                        [orca_report](const auto& actual) {
1061                          auto value = OrcaLoadReportsDiff(*orca_report, actual);
1062                          if (value.has_value()) {
1063                            gpr_log(GPR_DEBUG, "Reports mismatch: %s",
1064                                    value->c_str());
1065                            return false;
1066                          }
1067                          return true;
1068                        },
1069                        kTimeout, 10)
1070                    .has_value());
1071   }
1072   {
1073     StreamingOutputCallRequest request;
1074     request.add_response_parameters()->set_size(1);
1075     TestOrcaReport* orca_report = request.mutable_orca_oob_report();
1076     orca_report->set_cpu_utilization(0.29309);
1077     orca_report->set_memory_utilization(0.2);
1078     orca_report->mutable_utilization()->emplace("util", 0.2039);
1079     StreamingOutputCallResponse response;
1080     if (!stream->Write(request)) {
1081       gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Write() failed");
1082       return TransientFailureOrAbort();
1083     }
1084     if (!stream->Read(&response)) {
1085       gpr_log(GPR_ERROR, "DoOrcaOob(): stream->Read failed");
1086       return TransientFailureOrAbort();
1087     }
1088     GPR_ASSERT(
1089         load_report_tracker_
1090             .WaitForOobLoadReport(
1091                 [orca_report](const auto& report) {
1092                   return !OrcaLoadReportsDiff(*orca_report, report).has_value();
1093                 },
1094                 kTimeout, 10)
1095             .has_value());
1096   }
1097   gpr_log(GPR_INFO, "orca oob successfully finished");
1098   return true;
1099 }
1100 
DoCustomMetadata()1101 bool InteropClient::DoCustomMetadata() {
1102   const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
1103   const std::string kInitialMetadataValue("test_initial_metadata_value");
1104   const std::string kEchoTrailingBinMetadataKey(
1105       "x-grpc-test-echo-trailing-bin");
1106   const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
1107 
1108   {
1109     gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
1110     ClientContext context;
1111     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1112     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1113     SimpleRequest request;
1114     SimpleResponse response;
1115     request.set_response_size(kLargeResponseSize);
1116     std::string payload(kLargeRequestSize, '\0');
1117     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1118 
1119     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1120     if (!AssertStatusOk(s, context.debug_error_string())) {
1121       return false;
1122     }
1123 
1124     const auto& server_initial_metadata = context.GetServerInitialMetadata();
1125     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1126     GPR_ASSERT(iter != server_initial_metadata.end());
1127     GPR_ASSERT(iter->second == kInitialMetadataValue);
1128     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1129     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1130     GPR_ASSERT(iter != server_trailing_metadata.end());
1131     GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1132                kTrailingBinValue);
1133 
1134     gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
1135   }
1136 
1137   {
1138     gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
1139     ClientContext context;
1140     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
1141     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
1142     std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
1143                                        StreamingOutputCallResponse>>
1144         stream(serviceStub_.Get()->FullDuplexCall(&context));
1145 
1146     StreamingOutputCallRequest request;
1147     ResponseParameters* response_parameter = request.add_response_parameters();
1148     response_parameter->set_size(kLargeResponseSize);
1149     std::string payload(kLargeRequestSize, '\0');
1150     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
1151     StreamingOutputCallResponse response;
1152 
1153     if (!stream->Write(request)) {
1154       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
1155       return TransientFailureOrAbort();
1156     }
1157 
1158     stream->WritesDone();
1159 
1160     if (!stream->Read(&response)) {
1161       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
1162       return TransientFailureOrAbort();
1163     }
1164 
1165     GPR_ASSERT(response.payload().body() ==
1166                std::string(kLargeResponseSize, '\0'));
1167 
1168     GPR_ASSERT(!stream->Read(&response));
1169 
1170     Status s = stream->Finish();
1171     if (!AssertStatusOk(s, context.debug_error_string())) {
1172       return false;
1173     }
1174 
1175     const auto& server_initial_metadata = context.GetServerInitialMetadata();
1176     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
1177     GPR_ASSERT(iter != server_initial_metadata.end());
1178     GPR_ASSERT(iter->second == kInitialMetadataValue);
1179     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
1180     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
1181     GPR_ASSERT(iter != server_trailing_metadata.end());
1182     GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
1183                kTrailingBinValue);
1184 
1185     gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
1186   }
1187 
1188   return true;
1189 }
1190 
1191 std::tuple<bool, int32_t, std::string, std::string>
PerformOneSoakTestIteration(const bool reset_channel,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t request_size,const int32_t response_size)1192 InteropClient::PerformOneSoakTestIteration(
1193     const bool reset_channel,
1194     const int32_t max_acceptable_per_iteration_latency_ms,
1195     const int32_t request_size, const int32_t response_size) {
1196   gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
1197   SimpleRequest request;
1198   SimpleResponse response;
1199   // Don't set the deadline on the RPC, and instead just
1200   // record how long the RPC took and compare. This makes
1201   // debugging easier when looking at failure results.
1202   ClientContext context;
1203   InteropClientContextInspector inspector(context);
1204   request.set_response_size(response_size);
1205   std::string payload(request_size, '\0');
1206   request.mutable_payload()->set_body(payload.c_str(), request_size);
1207   if (reset_channel) {
1208     serviceStub_.ResetChannel();
1209   }
1210   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
1211   gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1212   int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
1213   if (!s.ok()) {
1214     return std::make_tuple(false, elapsed_ms, context.debug_error_string(),
1215                            context.peer());
1216   } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
1217     std::string debug_string = absl::StrFormat(
1218         "%d ms exceeds max acceptable latency: %d ms, peer: %s", elapsed_ms,
1219         max_acceptable_per_iteration_latency_ms, context.peer());
1220     return std::make_tuple(false, elapsed_ms, std::move(debug_string),
1221                            context.peer());
1222   } else {
1223     return std::make_tuple(true, elapsed_ms, "", context.peer());
1224   }
1225 }
1226 
PerformSoakTest(const std::string & server_uri,const bool reset_channel_per_iteration,const int32_t soak_iterations,const int32_t max_failures,const int32_t max_acceptable_per_iteration_latency_ms,const int32_t min_time_ms_between_rpcs,const int32_t overall_timeout_seconds,const int32_t request_size,const int32_t response_size)1227 void InteropClient::PerformSoakTest(
1228     const std::string& server_uri, const bool reset_channel_per_iteration,
1229     const int32_t soak_iterations, const int32_t max_failures,
1230     const int32_t max_acceptable_per_iteration_latency_ms,
1231     const int32_t min_time_ms_between_rpcs,
1232     const int32_t overall_timeout_seconds, const int32_t request_size,
1233     const int32_t response_size) {
1234   std::vector<std::tuple<bool, int32_t, std::string, std::string>> results;
1235   grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
1236       1 /* resolution */,
1237       500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
1238   gpr_timespec overall_deadline = gpr_time_add(
1239       gpr_now(GPR_CLOCK_MONOTONIC),
1240       gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
1241   int32_t iterations_ran = 0;
1242   int total_failures = 0;
1243   for (int i = 0;
1244        i < soak_iterations &&
1245        gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
1246        ++i) {
1247     gpr_timespec earliest_next_start = gpr_time_add(
1248         gpr_now(GPR_CLOCK_MONOTONIC),
1249         gpr_time_from_millis(min_time_ms_between_rpcs, GPR_TIMESPAN));
1250     auto result = PerformOneSoakTestIteration(
1251         reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms,
1252         request_size, response_size);
1253     bool success = std::get<0>(result);
1254     int32_t elapsed_ms = std::get<1>(result);
1255     std::string debug_string = std::get<2>(result);
1256     std::string peer = std::get<3>(result);
1257     results.push_back(result);
1258     if (!success) {
1259       gpr_log(GPR_INFO,
1260               "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s "
1261               "failed: %s",
1262               i, elapsed_ms, peer.c_str(), server_uri.c_str(),
1263               debug_string.c_str());
1264       total_failures++;
1265     } else {
1266       gpr_log(
1267           GPR_INFO,
1268           "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded",
1269           i, elapsed_ms, peer.c_str(), server_uri.c_str());
1270     }
1271     grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
1272     iterations_ran++;
1273     gpr_sleep_until(earliest_next_start);
1274   }
1275   double latency_ms_median =
1276       grpc_histogram_percentile(latencies_ms_histogram, 50);
1277   double latency_ms_90th =
1278       grpc_histogram_percentile(latencies_ms_histogram, 90);
1279   double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
1280   grpc_histogram_destroy(latencies_ms_histogram);
1281   if (iterations_ran < soak_iterations) {
1282     gpr_log(
1283         GPR_ERROR,
1284         "(server_uri: %s) soak test consumed all %d seconds of time and quit "
1285         "early, only "
1286         "having ran %d out of desired %d iterations. "
1287         "total_failures: %d. "
1288         "max_failures_threshold: %d. "
1289         "median_soak_iteration_latency: %lf ms. "
1290         "90th_soak_iteration_latency: %lf ms. "
1291         "worst_soak_iteration_latency: %lf ms. "
1292         "Some or all of the iterations that did run were unexpectedly slow. "
1293         "See breakdown above for which iterations succeeded, failed, and "
1294         "why for more info.",
1295         server_uri.c_str(), overall_timeout_seconds, iterations_ran,
1296         soak_iterations, total_failures, max_failures, latency_ms_median,
1297         latency_ms_90th, latency_ms_worst);
1298     GPR_ASSERT(0);
1299   } else if (total_failures > max_failures) {
1300     gpr_log(GPR_ERROR,
1301             "(server_uri: %s) soak test ran: %d iterations. total_failures: %d "
1302             "exceeds "
1303             "max_failures_threshold: %d. "
1304             "median_soak_iteration_latency: %lf ms. "
1305             "90th_soak_iteration_latency: %lf ms. "
1306             "worst_soak_iteration_latency: %lf ms. "
1307             "See breakdown above for which iterations succeeded, failed, and "
1308             "why for more info.",
1309             server_uri.c_str(), soak_iterations, total_failures, max_failures,
1310             latency_ms_median, latency_ms_90th, latency_ms_worst);
1311     GPR_ASSERT(0);
1312   } else {
1313     gpr_log(GPR_INFO,
1314             "(server_uri: %s) soak test ran: %d iterations. total_failures: %d "
1315             "is within "
1316             "max_failures_threshold: %d. "
1317             "median_soak_iteration_latency: %lf ms. "
1318             "90th_soak_iteration_latency: %lf ms. "
1319             "worst_soak_iteration_latency: %lf ms. "
1320             "See breakdown above for which iterations succeeded, failed, and "
1321             "why for more info.",
1322             server_uri.c_str(), soak_iterations, total_failures, max_failures,
1323             latency_ms_median, latency_ms_90th, latency_ms_worst);
1324   }
1325 }
1326 
DoRpcSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1327 bool InteropClient::DoRpcSoakTest(
1328     const std::string& server_uri, int32_t soak_iterations,
1329     int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1330     int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1331     int32_t request_size, int32_t response_size) {
1332   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1333   GPR_ASSERT(soak_iterations > 0);
1334   PerformSoakTest(server_uri, false /* reset channel per iteration */,
1335                   soak_iterations, max_failures,
1336                   max_acceptable_per_iteration_latency_ms,
1337                   soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1338                   request_size, response_size);
1339   gpr_log(GPR_DEBUG, "rpc_soak test done.");
1340   return true;
1341 }
1342 
DoChannelSoakTest(const std::string & server_uri,int32_t soak_iterations,int32_t max_failures,int64_t max_acceptable_per_iteration_latency_ms,int32_t soak_min_time_ms_between_rpcs,int32_t overall_timeout_seconds,int32_t request_size,int32_t response_size)1343 bool InteropClient::DoChannelSoakTest(
1344     const std::string& server_uri, int32_t soak_iterations,
1345     int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms,
1346     int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds,
1347     int32_t request_size, int32_t response_size) {
1348   gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
1349           soak_iterations);
1350   GPR_ASSERT(soak_iterations > 0);
1351   PerformSoakTest(server_uri, true /* reset channel per iteration */,
1352                   soak_iterations, max_failures,
1353                   max_acceptable_per_iteration_latency_ms,
1354                   soak_min_time_ms_between_rpcs, overall_timeout_seconds,
1355                   request_size, response_size);
1356   gpr_log(GPR_DEBUG, "channel_soak test done.");
1357   return true;
1358 }
1359 
DoLongLivedChannelTest(int32_t soak_iterations,int32_t iteration_interval)1360 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
1361                                            int32_t iteration_interval) {
1362   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
1363   GPR_ASSERT(soak_iterations > 0);
1364   GPR_ASSERT(iteration_interval > 0);
1365   SimpleRequest request;
1366   SimpleResponse response;
1367   int num_failures = 0;
1368   for (int i = 0; i < soak_iterations; ++i) {
1369     gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
1370     if (!PerformLargeUnary(&request, &response)) {
1371       gpr_log(GPR_ERROR, "Iteration %d failed.", i);
1372       num_failures++;
1373     }
1374     gpr_sleep_until(
1375         gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1376                      gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
1377   }
1378   if (num_failures == 0) {
1379     gpr_log(GPR_DEBUG, "long_lived_channel test done.");
1380     return true;
1381   } else {
1382     gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
1383             num_failures);
1384     return false;
1385   }
1386 }
1387 
DoUnimplementedService()1388 bool InteropClient::DoUnimplementedService() {
1389   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
1390 
1391   Empty request;
1392   Empty response;
1393   ClientContext context;
1394 
1395   UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
1396 
1397   Status s = stub->UnimplementedCall(&context, request, &response);
1398 
1399   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1400                         context.debug_error_string())) {
1401     return false;
1402   }
1403 
1404   gpr_log(GPR_DEBUG, "unimplemented service done.");
1405   return true;
1406 }
1407 
DoUnimplementedMethod()1408 bool InteropClient::DoUnimplementedMethod() {
1409   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
1410 
1411   Empty request;
1412   Empty response;
1413   ClientContext context;
1414 
1415   Status s =
1416       serviceStub_.Get()->UnimplementedCall(&context, request, &response);
1417 
1418   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
1419                         context.debug_error_string())) {
1420     return false;
1421   }
1422 
1423   gpr_log(GPR_DEBUG, "unimplemented rpc done.");
1424   return true;
1425 }
1426 
1427 }  // namespace testing
1428 }  // namespace grpc
1429