xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <mutex>
20 #include <thread>
21 
22 #include "absl/memory/memory.h"
23 #include "absl/strings/ascii.h"
24 #include "absl/strings/match.h"
25 #include "absl/strings/str_format.h"
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/resource_quota.h>
35 #include <grpcpp/security/auth_metadata_processor.h>
36 #include <grpcpp/security/credentials.h>
37 #include <grpcpp/security/server_credentials.h>
38 #include <grpcpp/server.h>
39 #include <grpcpp/server_builder.h>
40 #include <grpcpp/server_context.h>
41 #include <grpcpp/support/string_ref.h>
42 #include <grpcpp/test/channel_test_peer.h>
43 
44 #include "src/core/client_channel/backup_poller.h"
45 #include "src/core/lib/config/config_vars.h"
46 #include "src/core/lib/gprpp/crash.h"
47 #include "src/core/lib/gprpp/env.h"
48 #include "src/core/lib/iomgr/iomgr.h"
49 #include "src/core/lib/security/credentials/credentials.h"
50 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
51 #include "src/proto/grpc/testing/echo.grpc.pb.h"
52 #include "test/core/util/port.h"
53 #include "test/core/util/test_config.h"
54 #include "test/cpp/end2end/interceptors_util.h"
55 #include "test/cpp/end2end/test_service_impl.h"
56 #include "test/cpp/util/string_ref_helper.h"
57 #include "test/cpp/util/test_credentials_provider.h"
58 
59 #ifdef GRPC_POSIX_SOCKET_EV
60 #include "src/core/lib/iomgr/ev_posix.h"
61 #endif  // GRPC_POSIX_SOCKET_EV
62 
63 #include <gtest/gtest.h>
64 
65 using std::chrono::system_clock;
66 
67 namespace grpc {
68 namespace testing {
69 namespace {
70 
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72   const std::string kIpv6("ipv6:%5B::1%5D:");
73   const std::string kIpv4MappedIpv6("ipv6:%5B::ffff:127.0.0.1%5D:");
74   const std::string kIpv4("ipv4:127.0.0.1:");
75   return addr.substr(0, kIpv4.size()) == kIpv4 ||
76          addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77          addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79 
80 const int kClientChannelBackupPollIntervalMs = 200;
81 
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83 
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87     "CallCredentials{GoogleIAMCredentials{Token:present,"
88     "AuthoritySelector:fake_selector}}";
89 
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93     "CallCredentials{GoogleIAMCredentials{Token:present,"
94     "AuthoritySelector:wrong_selector}}";
95 
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99     "CallCredentials{GoogleIAMCredentials{Token:present,"
100     "AuthoritySelector:fake_selector1}}";
101 
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105     "CallCredentials{GoogleIAMCredentials{Token:present,"
106     "AuthoritySelector:fake_selector2}}";
107 
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109     "CallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110     "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113     "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115     "CallCredentials{TestMetadataCredentials{key:meta_key,value:Does "
116     "not "
117     "matter}}";
118 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
119     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
120     "value:Does not matter, will fail anyway (see 3rd param)}}";
121 const char
122     kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
123         [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
124              "metadata,value:Dr Jekyll}}";
125 const char
126     kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
127         [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
128              "metadata,value:Mr Hyde}}";
129 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
130     "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
131     "value:Does not matter, will fail anyway (see 3rd param)}}";
132 const char kExpectedCompositeCallCredsDebugString[] =
133     "CallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
134     "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
135     "call-creds-key2,value:call-creds-val2}}}";
136 
137 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
138  public:
139   static const char kGoodMetadataKey[];
140   static const char kBadMetadataKey[];
141 
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)142   TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
143                                 const grpc::string_ref& metadata_value,
144                                 bool is_blocking, bool is_successful,
145                                 int delay_ms)
146       : metadata_key_(metadata_key.data(), metadata_key.length()),
147         metadata_value_(metadata_value.data(), metadata_value.length()),
148         is_blocking_(is_blocking),
149         is_successful_(is_successful),
150         delay_ms_(delay_ms) {}
151 
IsBlocking() const152   bool IsBlocking() const override { return is_blocking_; }
153 
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)154   Status GetMetadata(
155       grpc::string_ref service_url, grpc::string_ref method_name,
156       const grpc::AuthContext& channel_auth_context,
157       std::multimap<std::string, std::string>* metadata) override {
158     if (delay_ms_ != 0) {
159       gpr_sleep_until(
160           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
161                        gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
162     }
163     EXPECT_GT(service_url.length(), 0UL);
164     EXPECT_GT(method_name.length(), 0UL);
165     EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
166     EXPECT_TRUE(metadata != nullptr);
167     if (is_successful_) {
168       metadata->insert(std::make_pair(metadata_key_, metadata_value_));
169       return Status::OK;
170     } else {
171       return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
172     }
173   }
174 
DebugString()175   std::string DebugString() override {
176     return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
177                            metadata_key_.c_str(), metadata_value_.c_str());
178   }
179 
180  private:
181   std::string metadata_key_;
182   std::string metadata_value_;
183   bool is_blocking_;
184   bool is_successful_;
185   int delay_ms_;
186 };
187 
188 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
189     "TestPluginMetadata";
190 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
191     "test-plugin-metadata";
192 
193 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
194  public:
195   static const char kGoodGuy[];
196 
TestAuthMetadataProcessor(bool is_blocking)197   explicit TestAuthMetadataProcessor(bool is_blocking)
198       : is_blocking_(is_blocking) {}
199 
GetCompatibleClientCreds()200   std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
201     return grpc::MetadataCredentialsFromPlugin(
202         std::unique_ptr<MetadataCredentialsPlugin>(
203             new TestMetadataCredentialsPlugin(
204                 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
205                 is_blocking_, true, 0)));
206   }
207 
GetIncompatibleClientCreds()208   std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
209     return grpc::MetadataCredentialsFromPlugin(
210         std::unique_ptr<MetadataCredentialsPlugin>(
211             new TestMetadataCredentialsPlugin(
212                 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
213                 is_blocking_, true, 0)));
214   }
215 
216   // Interface implementation
IsBlocking() const217   bool IsBlocking() const override { return is_blocking_; }
218 
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)219   Status Process(const InputMetadata& auth_metadata, AuthContext* context,
220                  OutputMetadata* consumed_auth_metadata,
221                  OutputMetadata* response_metadata) override {
222     EXPECT_TRUE(consumed_auth_metadata != nullptr);
223     EXPECT_TRUE(context != nullptr);
224     EXPECT_TRUE(response_metadata != nullptr);
225     auto auth_md =
226         auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
227     EXPECT_NE(auth_md, auth_metadata.end());
228     string_ref auth_md_value = auth_md->second;
229     if (auth_md_value == kGoodGuy) {
230       context->AddProperty(kIdentityPropName, kGoodGuy);
231       context->SetPeerIdentityPropertyName(kIdentityPropName);
232       consumed_auth_metadata->insert(std::make_pair(
233           string(auth_md->first.data(), auth_md->first.length()),
234           string(auth_md->second.data(), auth_md->second.length())));
235       return Status::OK;
236     } else {
237       return Status(StatusCode::UNAUTHENTICATED,
238                     string("Invalid principal: ") +
239                         string(auth_md_value.data(), auth_md_value.length()));
240     }
241   }
242 
243  private:
244   static const char kIdentityPropName[];
245   bool is_blocking_;
246 };
247 
248 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
249 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
250 
251 class Proxy : public grpc::testing::EchoTestService::Service {
252  public:
Proxy(const std::shared_ptr<Channel> & channel)253   explicit Proxy(const std::shared_ptr<Channel>& channel)
254       : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
255 
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)256   Status Echo(ServerContext* server_context, const EchoRequest* request,
257               EchoResponse* response) override {
258     std::unique_ptr<ClientContext> client_context =
259         ClientContext::FromServerContext(*server_context);
260     return stub_->Echo(client_context.get(), *request, response);
261   }
262 
263  private:
264   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
265 };
266 
267 class TestServiceImplDupPkg
268     : public grpc::testing::duplicate::EchoTestService::Service {
269  public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)270   Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
271               EchoResponse* response) override {
272     response->set_message("no package");
273     return Status::OK;
274   }
275 };
276 
277 class TestScenario {
278  public:
TestScenario(bool use_interceptors,bool use_proxy,bool inproc,const std::string & credentials_type,bool callback_server)279   TestScenario(bool use_interceptors, bool use_proxy, bool inproc,
280                const std::string& credentials_type, bool callback_server)
281       : use_interceptors_(use_interceptors),
282         use_proxy_(use_proxy),
283         inproc_(inproc),
284         credentials_type_(credentials_type),
285         callback_server_(callback_server) {}
286 
use_interceptors() const287   bool use_interceptors() const { return use_interceptors_; }
use_proxy() const288   bool use_proxy() const { return use_proxy_; }
inproc() const289   bool inproc() const { return inproc_; }
credentials_type() const290   const std::string& credentials_type() const { return credentials_type_; }
callback_server() const291   bool callback_server() const { return callback_server_; }
292 
293   std::string AsString() const;
294 
Name(const::testing::TestParamInfo<TestScenario> & info)295   static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
296     return info.param.AsString();
297   }
298 
299  private:
300   bool use_interceptors_;
301   bool use_proxy_;
302   bool inproc_;
303   const std::string credentials_type_;
304   bool callback_server_;
305 };
306 
AsString() const307 std::string TestScenario::AsString() const {
308   std::string retval = use_interceptors_ ? "Interceptor" : "";
309   if (use_proxy_) retval += "Proxy";
310   if (inproc_) retval += "Inproc";
311   if (callback_server_) retval += "CallbackServer";
312   if (credentials_type_ == kInsecureCredentialsType) {
313     retval += "Insecure";
314   } else {
315     std::string creds_type = absl::AsciiStrToLower(credentials_type_);
316     if (!creds_type.empty()) creds_type[0] = absl::ascii_toupper(creds_type[0]);
317     retval += creds_type;
318   }
319   return retval;
320 }
321 
322 class End2endTest : public ::testing::TestWithParam<TestScenario> {
323  protected:
SetUpTestSuite()324   static void SetUpTestSuite() { grpc_init(); }
TearDownTestSuite()325   static void TearDownTestSuite() { grpc_shutdown(); }
End2endTest()326   End2endTest()
327       : is_server_started_(false),
328         kMaxMessageSize_(8192),
329         special_service_("special"),
330         first_picked_port_(0) {}
331 
TearDown()332   void TearDown() override {
333     if (is_server_started_) {
334       server_->Shutdown();
335       if (proxy_server_) proxy_server_->Shutdown();
336     }
337     if (first_picked_port_ > 0) {
338       grpc_recycle_unused_port(first_picked_port_);
339     }
340   }
341 
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)342   void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
343     int port = grpc_pick_unused_port_or_die();
344     first_picked_port_ = port;
345     server_address_ << "localhost:" << port;
346     // Setup server
347     BuildAndStartServer(processor);
348   }
349 
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)350   void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
351     if (is_server_started_) {
352       server_->Shutdown();
353       BuildAndStartServer(processor);
354     }
355   }
356 
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)357   void BuildAndStartServer(
358       const std::shared_ptr<AuthMetadataProcessor>& processor) {
359     ServerBuilder builder;
360     ConfigureServerBuilder(&builder);
361     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
362         GetParam().credentials_type());
363     if (GetParam().credentials_type() != kInsecureCredentialsType) {
364       server_creds->SetAuthMetadataProcessor(processor);
365     }
366     if (GetParam().use_interceptors()) {
367       std::vector<
368           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
369           creators;
370       // Add 20 phony server interceptors
371       creators.reserve(20);
372       for (auto i = 0; i < 20; i++) {
373         creators.push_back(std::make_unique<PhonyInterceptorFactory>());
374       }
375       builder.experimental().SetInterceptorCreators(std::move(creators));
376     }
377     builder.AddListeningPort(server_address_.str(), server_creds);
378     if (!GetParam().callback_server()) {
379       builder.RegisterService(&service_);
380     } else {
381       builder.RegisterService(&callback_service_);
382     }
383     builder.RegisterService("foo.test.youtube.com", &special_service_);
384     builder.RegisterService(&dup_pkg_service_);
385 
386     builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
387     builder.SetSyncServerOption(
388         ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
389 
390     server_ = builder.BuildAndStart();
391     is_server_started_ = true;
392   }
393 
ConfigureServerBuilder(ServerBuilder * builder)394   virtual void ConfigureServerBuilder(ServerBuilder* builder) {
395     builder->SetMaxMessageSize(
396         kMaxMessageSize_);  // For testing max message size.
397   }
398 
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})399   void ResetChannel(
400       std::vector<
401           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
402           interceptor_creators = {}) {
403     if (!is_server_started_) {
404       StartServer(std::shared_ptr<AuthMetadataProcessor>());
405     }
406     EXPECT_TRUE(is_server_started_);
407     ChannelArguments args;
408     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
409         GetParam().credentials_type(), &args);
410     if (!user_agent_prefix_.empty()) {
411       args.SetUserAgentPrefix(user_agent_prefix_);
412     }
413     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
414 
415     if (!GetParam().inproc()) {
416       if (!GetParam().use_interceptors()) {
417         channel_ = grpc::CreateCustomChannel(server_address_.str(),
418                                              channel_creds, args);
419       } else {
420         channel_ = CreateCustomChannelWithInterceptors(
421             server_address_.str(), channel_creds, args,
422             interceptor_creators.empty() ? CreatePhonyClientInterceptors()
423                                          : std::move(interceptor_creators));
424       }
425     } else {
426       if (!GetParam().use_interceptors()) {
427         channel_ = server_->InProcessChannel(args);
428       } else {
429         channel_ = server_->experimental().InProcessChannelWithInterceptors(
430             args, interceptor_creators.empty()
431                       ? CreatePhonyClientInterceptors()
432                       : std::move(interceptor_creators));
433       }
434     }
435   }
436 
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})437   void ResetStub(
438       std::vector<
439           std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
440           interceptor_creators = {}) {
441     ResetChannel(std::move(interceptor_creators));
442     if (GetParam().use_proxy()) {
443       proxy_service_ = std::make_unique<Proxy>(channel_);
444       int port = grpc_pick_unused_port_or_die();
445       std::ostringstream proxyaddr;
446       proxyaddr << "localhost:" << port;
447       ServerBuilder builder;
448       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
449       builder.RegisterService(proxy_service_.get());
450 
451       builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
452       builder.SetSyncServerOption(
453           ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
454 
455       proxy_server_ = builder.BuildAndStart();
456 
457       channel_ =
458           grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
459     }
460 
461     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
462     PhonyInterceptor::Reset();
463   }
464 
465   bool is_server_started_;
466   std::shared_ptr<Channel> channel_;
467   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
468   std::unique_ptr<Server> server_;
469   std::unique_ptr<Server> proxy_server_;
470   std::unique_ptr<Proxy> proxy_service_;
471   std::ostringstream server_address_;
472   const int kMaxMessageSize_;
473   TestServiceImpl service_;
474   CallbackTestServiceImpl callback_service_;
475   TestServiceImpl special_service_;
476   TestServiceImplDupPkg dup_pkg_service_;
477   std::string user_agent_prefix_;
478   int first_picked_port_;
479 };
480 
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)481 void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
482              bool with_binary_metadata) {
483   EchoRequest request;
484   EchoResponse response;
485   request.set_message("Hello hello hello hello");
486 
487   for (int i = 0; i < num_rpcs; ++i) {
488     ClientContext context;
489     if (with_binary_metadata) {
490       char bytes[8] = {'\0', '\1', '\2', '\3',
491                        '\4', '\5', '\6', static_cast<char>(i)};
492       context.AddMetadata("custom-bin", std::string(bytes, 8));
493     }
494     context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
495     Status s = stub->Echo(&context, request, &response);
496     EXPECT_EQ(response.message(), request.message());
497     EXPECT_TRUE(s.ok());
498   }
499 }
500 
501 // This class is for testing scenarios where RPCs are cancelled on the server
502 // by calling ServerContext::TryCancel()
503 class End2endServerTryCancelTest : public End2endTest {
504  protected:
505   // Helper for testing client-streaming RPCs which are cancelled on the server.
506   // Depending on the value of server_try_cancel parameter, this will test one
507   // of the following three scenarios:
508   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
509   //   any messages from the client
510   //
511   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
512   //   messages from the client
513   //
514   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
515   //   the messages from the client
516   //
517   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)518   void TestRequestStreamServerCancel(
519       ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
520     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
521     ResetStub();
522     EchoRequest request;
523     EchoResponse response;
524     ClientContext context;
525 
526     // Send server_try_cancel value in the client metadata
527     context.AddMetadata(kServerTryCancelRequest,
528                         std::to_string(server_try_cancel));
529 
530     auto stream = stub_->RequestStream(&context, &response);
531 
532     int num_msgs_sent = 0;
533     while (num_msgs_sent < num_msgs_to_send) {
534       request.set_message("hello");
535       if (!stream->Write(request)) {
536         break;
537       }
538       num_msgs_sent++;
539     }
540     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
541 
542     stream->WritesDone();
543     Status s = stream->Finish();
544 
545     // At this point, we know for sure that RPC was cancelled by the server
546     // since we passed server_try_cancel value in the metadata. Depending on the
547     // value of server_try_cancel, the RPC might have been cancelled by the
548     // server at different stages. The following validates our expectations of
549     // number of messages sent in various cancellation scenarios:
550 
551     switch (server_try_cancel) {
552       case CANCEL_BEFORE_PROCESSING:
553       case CANCEL_DURING_PROCESSING:
554         // If the RPC is cancelled by server before / during messages from the
555         // client, it means that the client most likely did not get a chance to
556         // send all the messages it wanted to send. i.e num_msgs_sent <=
557         // num_msgs_to_send
558         EXPECT_LE(num_msgs_sent, num_msgs_to_send);
559         break;
560 
561       case CANCEL_AFTER_PROCESSING:
562         // If the RPC was cancelled after all messages were read by the server,
563         // the client did get a chance to send all its messages
564         EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
565         break;
566 
567       default:
568         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
569                 server_try_cancel);
570         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
571                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
572         break;
573     }
574 
575     EXPECT_FALSE(s.ok());
576     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
577     // Make sure that the server interceptors were notified
578     if (GetParam().use_interceptors()) {
579       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
580     }
581   }
582 
583   // Helper for testing server-streaming RPCs which are cancelled on the server.
584   // Depending on the value of server_try_cancel parameter, this will test one
585   // of the following three scenarios:
586   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
587   //   any messages to the client
588   //
589   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
590   //   messages to the client
591   //
592   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
593   //   the messages to the client
594   //
595   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)596   void TestResponseStreamServerCancel(
597       ServerTryCancelRequestPhase server_try_cancel) {
598     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
599     ResetStub();
600     EchoRequest request;
601     EchoResponse response;
602     ClientContext context;
603 
604     // Send server_try_cancel in the client metadata
605     context.AddMetadata(kServerTryCancelRequest,
606                         std::to_string(server_try_cancel));
607 
608     request.set_message("hello");
609     auto stream = stub_->ResponseStream(&context, request);
610 
611     int num_msgs_read = 0;
612     while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
613       if (!stream->Read(&response)) {
614         break;
615       }
616       EXPECT_EQ(response.message(),
617                 request.message() + std::to_string(num_msgs_read));
618       num_msgs_read++;
619     }
620     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
621 
622     Status s = stream->Finish();
623 
624     // Depending on the value of server_try_cancel, the RPC might have been
625     // cancelled by the server at different stages. The following validates our
626     // expectations of number of messages read in various cancellation
627     // scenarios:
628     switch (server_try_cancel) {
629       case CANCEL_BEFORE_PROCESSING:
630         // Server cancelled before sending any messages. Which means the client
631         // wouldn't have read any
632         EXPECT_EQ(num_msgs_read, 0);
633         break;
634 
635       case CANCEL_DURING_PROCESSING:
636         // Server cancelled while writing messages. Client must have read less
637         // than or equal to the expected number of messages
638         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
639         break;
640 
641       case CANCEL_AFTER_PROCESSING:
642         // Even though the Server cancelled after writing all messages, the RPC
643         // may be cancelled before the Client got a chance to read all the
644         // messages.
645         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
646         break;
647 
648       default: {
649         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
650                 server_try_cancel);
651         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
652                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
653         break;
654       }
655     }
656 
657     EXPECT_FALSE(s.ok());
658     // Make sure that the server interceptors were notified
659     if (GetParam().use_interceptors()) {
660       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
661     }
662   }
663 
664   // Helper for testing bidirectional-streaming RPCs which are cancelled on the
665   // server. Depending on the value of server_try_cancel parameter, this will
666   // test one of the following three scenarios:
667   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
668   //   writing any messages from/to the client
669   //
670   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
671   //   writing messages from/to the client
672   //
673   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
674   //   all the messages from/to the client
675   //
676   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)677   void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
678                                   int num_messages) {
679     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
680     ResetStub();
681     EchoRequest request;
682     EchoResponse response;
683     ClientContext context;
684 
685     // Send server_try_cancel in the client metadata
686     context.AddMetadata(kServerTryCancelRequest,
687                         std::to_string(server_try_cancel));
688 
689     auto stream = stub_->BidiStream(&context);
690 
691     int num_msgs_read = 0;
692     int num_msgs_sent = 0;
693     while (num_msgs_sent < num_messages) {
694       request.set_message("hello " + std::to_string(num_msgs_sent));
695       if (!stream->Write(request)) {
696         break;
697       }
698       num_msgs_sent++;
699 
700       if (!stream->Read(&response)) {
701         break;
702       }
703       num_msgs_read++;
704 
705       EXPECT_EQ(response.message(), request.message());
706     }
707     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
708     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
709 
710     stream->WritesDone();
711     Status s = stream->Finish();
712 
713     // Depending on the value of server_try_cancel, the RPC might have been
714     // cancelled by the server at different stages. The following validates our
715     // expectations of number of messages read in various cancellation
716     // scenarios:
717     switch (server_try_cancel) {
718       case CANCEL_BEFORE_PROCESSING:
719         EXPECT_EQ(num_msgs_read, 0);
720         break;
721 
722       case CANCEL_DURING_PROCESSING:
723         EXPECT_LE(num_msgs_sent, num_messages);
724         EXPECT_LE(num_msgs_read, num_msgs_sent);
725         break;
726 
727       case CANCEL_AFTER_PROCESSING:
728         EXPECT_EQ(num_msgs_sent, num_messages);
729 
730         // The Server cancelled after reading the last message and after writing
731         // the message to the client. However, the RPC cancellation might have
732         // taken effect before the client actually read the response.
733         EXPECT_LE(num_msgs_read, num_msgs_sent);
734         break;
735 
736       default:
737         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
738                 server_try_cancel);
739         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
740                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
741         break;
742     }
743 
744     EXPECT_FALSE(s.ok());
745     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
746     // Make sure that the server interceptors were notified
747     if (GetParam().use_interceptors()) {
748       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
749     }
750   }
751 };
752 
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)753 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
754   ResetStub();
755   EchoRequest request;
756   EchoResponse response;
757   ClientContext context;
758 
759   context.AddMetadata(kServerTryCancelRequest,
760                       std::to_string(CANCEL_BEFORE_PROCESSING));
761   Status s = stub_->Echo(&context, request, &response);
762   EXPECT_FALSE(s.ok());
763   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
764 }
765 
766 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)767 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
768   TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
769 }
770 
771 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)772 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
773   TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
774 }
775 
776 // Server to cancel after reading all the requests but before returning to the
777 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)778 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
779   TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
780 }
781 
782 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)783 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
784   TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
785 }
786 
787 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)788 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
789   TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
790 }
791 
792 // Server to cancel after writing all the respones to the stream but before
793 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)794 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
795   TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
796 }
797 
798 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)799 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
800   TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
801 }
802 
803 // Server to cancel while reading/writing requests/responses on the stream in
804 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)805 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
806   TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
807 }
808 
809 // Server to cancel after reading/writing all requests/responses on the stream
810 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)811 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
812   TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
813 }
814 
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)815 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
816   // User-Agent is an HTTP header for HTTP transports only
817   if (GetParam().inproc()) {
818     return;
819   }
820   user_agent_prefix_ = "custom_prefix";
821   ResetStub();
822   EchoRequest request;
823   EchoResponse response;
824   request.set_message("Hello hello hello hello");
825   request.mutable_param()->set_echo_metadata(true);
826 
827   ClientContext context;
828   Status s = stub_->Echo(&context, request, &response);
829   EXPECT_EQ(response.message(), request.message());
830   EXPECT_TRUE(s.ok());
831   const auto& trailing_metadata = context.GetServerTrailingMetadata();
832   auto iter = trailing_metadata.find("user-agent");
833   EXPECT_TRUE(iter != trailing_metadata.end());
834   std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
835   EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
836 }
837 
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)838 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
839   ResetStub();
840   std::vector<std::thread> threads;
841   threads.reserve(10);
842   for (int i = 0; i < 10; ++i) {
843     threads.emplace_back(SendRpc, stub_.get(), 10, true);
844   }
845   for (int i = 0; i < 10; ++i) {
846     threads[i].join();
847   }
848 }
849 
TEST_P(End2endTest,MultipleRpcs)850 TEST_P(End2endTest, MultipleRpcs) {
851   ResetStub();
852   std::vector<std::thread> threads;
853   threads.reserve(10);
854   for (int i = 0; i < 10; ++i) {
855     threads.emplace_back(SendRpc, stub_.get(), 10, false);
856   }
857   for (int i = 0; i < 10; ++i) {
858     threads[i].join();
859   }
860 }
861 
TEST_P(End2endTest,ManyStubs)862 TEST_P(End2endTest, ManyStubs) {
863   ResetStub();
864   ChannelTestPeer peer(channel_.get());
865   int registered_calls_pre = peer.registered_calls();
866   for (int i = 0; i < 1000; ++i) {
867     grpc::testing::EchoTestService::NewStub(channel_);
868   }
869   EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
870 }
871 
TEST_P(End2endTest,EmptyBinaryMetadata)872 TEST_P(End2endTest, EmptyBinaryMetadata) {
873   ResetStub();
874   EchoRequest request;
875   EchoResponse response;
876   request.set_message("Hello hello hello hello");
877   ClientContext context;
878   context.AddMetadata("custom-bin", "");
879   Status s = stub_->Echo(&context, request, &response);
880   EXPECT_EQ(response.message(), request.message());
881   EXPECT_TRUE(s.ok());
882 }
883 
TEST_P(End2endTest,AuthoritySeenOnServerSide)884 TEST_P(End2endTest, AuthoritySeenOnServerSide) {
885   ResetStub();
886   EchoRequest request;
887   request.mutable_param()->set_echo_host_from_authority_header(true);
888   EchoResponse response;
889   request.set_message("Live long and prosper.");
890   ClientContext context;
891   Status s = stub_->Echo(&context, request, &response);
892   EXPECT_EQ(response.message(), request.message());
893   if (GetParam().credentials_type() == kTlsCredentialsType) {
894     // SSL creds overrides the authority.
895     EXPECT_EQ("foo.test.google.fr", response.param().host());
896   } else if (GetParam().inproc()) {
897     EXPECT_EQ("inproc", response.param().host());
898   } else {
899     EXPECT_EQ(server_address_.str(), response.param().host());
900   }
901   EXPECT_TRUE(s.ok());
902 }
903 
TEST_P(End2endTest,ReconnectChannel)904 TEST_P(End2endTest, ReconnectChannel) {
905   if (GetParam().inproc()) {
906     return;
907   }
908   int poller_slowdown_factor = 1;
909   // It needs 2 pollset_works to reconnect the channel with polling engine
910   // "poll"
911 #ifdef GRPC_POSIX_SOCKET_EV
912   if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
913     poller_slowdown_factor = 2;
914   }
915 #endif  // GRPC_POSIX_SOCKET_EV
916   ResetStub();
917   SendRpc(stub_.get(), 1, false);
918   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
919   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
920   // reconnect the channel. Make it a factor of 5x
921   gpr_sleep_until(
922       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
923                    gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
924                                             poller_slowdown_factor *
925                                             grpc_test_slowdown_factor(),
926                                         GPR_TIMESPAN)));
927   SendRpc(stub_.get(), 1, false);
928 }
929 
TEST_P(End2endTest,RequestStreamOneRequest)930 TEST_P(End2endTest, RequestStreamOneRequest) {
931   ResetStub();
932   EchoRequest request;
933   EchoResponse response;
934   ClientContext context;
935 
936   auto stream = stub_->RequestStream(&context, &response);
937   request.set_message("hello");
938   EXPECT_TRUE(stream->Write(request));
939   stream->WritesDone();
940   Status s = stream->Finish();
941   EXPECT_EQ(response.message(), request.message());
942   EXPECT_TRUE(s.ok());
943   EXPECT_TRUE(context.debug_error_string().empty());
944 }
945 
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)946 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
947   ResetStub();
948   EchoRequest request;
949   EchoResponse response;
950   ClientContext context;
951 
952   context.set_initial_metadata_corked(true);
953   auto stream = stub_->RequestStream(&context, &response);
954   request.set_message("hello");
955   stream->WriteLast(request, WriteOptions());
956   Status s = stream->Finish();
957   EXPECT_EQ(response.message(), request.message());
958   EXPECT_TRUE(s.ok());
959 }
960 
TEST_P(End2endTest,RequestStreamTwoRequests)961 TEST_P(End2endTest, RequestStreamTwoRequests) {
962   ResetStub();
963   EchoRequest request;
964   EchoResponse response;
965   ClientContext context;
966 
967   auto stream = stub_->RequestStream(&context, &response);
968   request.set_message("hello");
969   EXPECT_TRUE(stream->Write(request));
970   EXPECT_TRUE(stream->Write(request));
971   stream->WritesDone();
972   Status s = stream->Finish();
973   EXPECT_EQ(response.message(), "hellohello");
974   EXPECT_TRUE(s.ok());
975 }
976 
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)977 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
978   ResetStub();
979   EchoRequest request;
980   EchoResponse response;
981   ClientContext context;
982 
983   auto stream = stub_->RequestStream(&context, &response);
984   request.set_message("hello");
985   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
986   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
987   stream->WritesDone();
988   Status s = stream->Finish();
989   EXPECT_EQ(response.message(), "hellohello");
990   EXPECT_TRUE(s.ok());
991 }
992 
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)993 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
994   ResetStub();
995   EchoRequest request;
996   EchoResponse response;
997   ClientContext context;
998 
999   context.set_initial_metadata_corked(true);
1000   auto stream = stub_->RequestStream(&context, &response);
1001   request.set_message("hello");
1002   EXPECT_TRUE(stream->Write(request));
1003   stream->WriteLast(request, WriteOptions());
1004   Status s = stream->Finish();
1005   EXPECT_EQ(response.message(), "hellohello");
1006   EXPECT_TRUE(s.ok());
1007 }
1008 
TEST_P(End2endTest,ResponseStream)1009 TEST_P(End2endTest, ResponseStream) {
1010   ResetStub();
1011   EchoRequest request;
1012   EchoResponse response;
1013   ClientContext context;
1014   request.set_message("hello");
1015 
1016   auto stream = stub_->ResponseStream(&context, request);
1017   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1018     EXPECT_TRUE(stream->Read(&response));
1019     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1020   }
1021   EXPECT_FALSE(stream->Read(&response));
1022 
1023   Status s = stream->Finish();
1024   EXPECT_TRUE(s.ok());
1025 }
1026 
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1027 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1028   ResetStub();
1029   EchoRequest request;
1030   EchoResponse response;
1031   ClientContext context;
1032   request.set_message("hello");
1033   context.AddMetadata(kServerUseCoalescingApi, "1");
1034 
1035   auto stream = stub_->ResponseStream(&context, request);
1036   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1037     EXPECT_TRUE(stream->Read(&response));
1038     EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1039   }
1040   EXPECT_FALSE(stream->Read(&response));
1041 
1042   Status s = stream->Finish();
1043   EXPECT_TRUE(s.ok());
1044 }
1045 
1046 // This was added to prevent regression from issue:
1047 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1048 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1049   ResetStub();
1050   EchoRequest request;
1051   EchoResponse response;
1052   ClientContext context;
1053   request.set_message("hello");
1054   context.AddMetadata(kServerUseCoalescingApi, "1");
1055   // We will only send one message, forcing everything (init metadata, message,
1056   // trailing) to be coalesced together.
1057   context.AddMetadata(kServerResponseStreamsToSend, "1");
1058 
1059   auto stream = stub_->ResponseStream(&context, request);
1060   EXPECT_TRUE(stream->Read(&response));
1061   EXPECT_EQ(response.message(), request.message() + "0");
1062 
1063   EXPECT_FALSE(stream->Read(&response));
1064 
1065   Status s = stream->Finish();
1066   EXPECT_TRUE(s.ok());
1067 }
1068 
TEST_P(End2endTest,BidiStream)1069 TEST_P(End2endTest, BidiStream) {
1070   ResetStub();
1071   EchoRequest request;
1072   EchoResponse response;
1073   ClientContext context;
1074   std::string msg("hello");
1075 
1076   auto stream = stub_->BidiStream(&context);
1077 
1078   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1079     request.set_message(msg + std::to_string(i));
1080     EXPECT_TRUE(stream->Write(request));
1081     EXPECT_TRUE(stream->Read(&response));
1082     EXPECT_EQ(response.message(), request.message());
1083   }
1084 
1085   stream->WritesDone();
1086   EXPECT_FALSE(stream->Read(&response));
1087   EXPECT_FALSE(stream->Read(&response));
1088 
1089   Status s = stream->Finish();
1090   EXPECT_TRUE(s.ok());
1091 }
1092 
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1093 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1094   ResetStub();
1095   EchoRequest request;
1096   EchoResponse response;
1097   ClientContext context;
1098   context.AddMetadata(kServerFinishAfterNReads, "3");
1099   context.set_initial_metadata_corked(true);
1100   std::string msg("hello");
1101 
1102   auto stream = stub_->BidiStream(&context);
1103 
1104   request.set_message(msg + "0");
1105   EXPECT_TRUE(stream->Write(request));
1106   EXPECT_TRUE(stream->Read(&response));
1107   EXPECT_EQ(response.message(), request.message());
1108 
1109   request.set_message(msg + "1");
1110   EXPECT_TRUE(stream->Write(request));
1111   EXPECT_TRUE(stream->Read(&response));
1112   EXPECT_EQ(response.message(), request.message());
1113 
1114   request.set_message(msg + "2");
1115   stream->WriteLast(request, WriteOptions());
1116   EXPECT_TRUE(stream->Read(&response));
1117   EXPECT_EQ(response.message(), request.message());
1118 
1119   EXPECT_FALSE(stream->Read(&response));
1120   EXPECT_FALSE(stream->Read(&response));
1121 
1122   Status s = stream->Finish();
1123   EXPECT_TRUE(s.ok());
1124 }
1125 
1126 // This was added to prevent regression from issue:
1127 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1128 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1129   ResetStub();
1130   EchoRequest request;
1131   EchoResponse response;
1132   ClientContext context;
1133   context.AddMetadata(kServerFinishAfterNReads, "1");
1134   context.set_initial_metadata_corked(true);
1135   std::string msg("hello");
1136 
1137   auto stream = stub_->BidiStream(&context);
1138 
1139   request.set_message(msg + "0");
1140   stream->WriteLast(request, WriteOptions());
1141   EXPECT_TRUE(stream->Read(&response));
1142   EXPECT_EQ(response.message(), request.message());
1143 
1144   EXPECT_FALSE(stream->Read(&response));
1145   EXPECT_FALSE(stream->Read(&response));
1146 
1147   Status s = stream->Finish();
1148   EXPECT_TRUE(s.ok());
1149 }
1150 
1151 // Talk to the two services with the same name but different package names.
1152 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1153 TEST_P(End2endTest, DiffPackageServices) {
1154   ResetStub();
1155   EchoRequest request;
1156   EchoResponse response;
1157   request.set_message("Hello");
1158 
1159   ClientContext context;
1160   Status s = stub_->Echo(&context, request, &response);
1161   EXPECT_EQ(response.message(), request.message());
1162   EXPECT_TRUE(s.ok());
1163 
1164   std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1165       grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1166   ClientContext context2;
1167   s = dup_pkg_stub->Echo(&context2, request, &response);
1168   EXPECT_EQ("no package", response.message());
1169   EXPECT_TRUE(s.ok());
1170 }
1171 
1172 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1173 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1174   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1175                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1176   while (!service->signal_client()) {
1177   }
1178   context->TryCancel();
1179 }
1180 
TEST_P(End2endTest,CancelRpcBeforeStart)1181 TEST_P(End2endTest, CancelRpcBeforeStart) {
1182   ResetStub();
1183   EchoRequest request;
1184   EchoResponse response;
1185   ClientContext context;
1186   request.set_message("hello");
1187   context.TryCancel();
1188   Status s = stub_->Echo(&context, request, &response);
1189   EXPECT_EQ("", response.message());
1190   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1191   if (GetParam().use_interceptors()) {
1192     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1193   }
1194 }
1195 
TEST_P(End2endTest,CancelRpcAfterStart)1196 TEST_P(End2endTest, CancelRpcAfterStart) {
1197   for (int i = 0; i < 10; i++) {
1198     ResetStub();
1199     EchoRequest request;
1200     EchoResponse response;
1201     ClientContext context;
1202     request.set_message("hello");
1203     request.mutable_param()->set_server_notify_client_when_started(true);
1204     request.mutable_param()->set_skip_cancelled_check(true);
1205     Status s;
1206     std::thread echo_thread([this, &s, &context, &request, &response] {
1207       s = stub_->Echo(&context, request, &response);
1208     });
1209     if (!GetParam().callback_server()) {
1210       EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1);
1211     } else {
1212       EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1);
1213     }
1214 
1215     context.TryCancel();
1216 
1217     if (!GetParam().callback_server()) {
1218       service_.SignalServerToContinue();
1219     } else {
1220       callback_service_.SignalServerToContinue();
1221     }
1222 
1223     echo_thread.join();
1224     // TODO(ctiller): improve test to not be flaky
1225     //
1226     // TryCancel is best effort, and it can happen that the cancellation is not
1227     // acted upon before the server wakes up, sends a response, and the client
1228     // reads that.
1229     // For this reason, we try a few times here to see the cancellation result.
1230     if (s.ok()) continue;
1231     EXPECT_EQ("", response.message());
1232     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1233     if (GetParam().use_interceptors()) {
1234       EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1235     }
1236     return;
1237   }
1238   GTEST_FAIL() << "Failed to get cancellation";
1239 }
1240 
1241 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1242 TEST_P(End2endTest, ClientCancelsRequestStream) {
1243   ResetStub();
1244   EchoRequest request;
1245   EchoResponse response;
1246   ClientContext context;
1247   request.set_message("hello");
1248 
1249   auto stream = stub_->RequestStream(&context, &response);
1250   EXPECT_TRUE(stream->Write(request));
1251   EXPECT_TRUE(stream->Write(request));
1252 
1253   context.TryCancel();
1254 
1255   Status s = stream->Finish();
1256   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1257 
1258   EXPECT_EQ(response.message(), "");
1259   if (GetParam().use_interceptors()) {
1260     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1261   }
1262 }
1263 
1264 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1265 TEST_P(End2endTest, ClientCancelsResponseStream) {
1266   ResetStub();
1267   EchoRequest request;
1268   EchoResponse response;
1269   ClientContext context;
1270   request.set_message("hello");
1271 
1272   auto stream = stub_->ResponseStream(&context, request);
1273 
1274   EXPECT_TRUE(stream->Read(&response));
1275   EXPECT_EQ(response.message(), request.message() + "0");
1276   EXPECT_TRUE(stream->Read(&response));
1277   EXPECT_EQ(response.message(), request.message() + "1");
1278 
1279   context.TryCancel();
1280 
1281   // The cancellation races with responses, so there might be zero or
1282   // one responses pending, read till failure
1283 
1284   if (stream->Read(&response)) {
1285     EXPECT_EQ(response.message(), request.message() + "2");
1286     // Since we have cancelled, we expect the next attempt to read to fail
1287     EXPECT_FALSE(stream->Read(&response));
1288   }
1289 
1290   Status s = stream->Finish();
1291   // The final status could be either of CANCELLED or OK depending on
1292   // who won the race.
1293   EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1294   if (GetParam().use_interceptors()) {
1295     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1296   }
1297 }
1298 
1299 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1300 TEST_P(End2endTest, ClientCancelsBidi) {
1301   ResetStub();
1302   EchoRequest request;
1303   EchoResponse response;
1304   ClientContext context;
1305   std::string msg("hello");
1306 
1307   // Send server_try_cancel value in the client metadata
1308   context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1309 
1310   auto stream = stub_->BidiStream(&context);
1311 
1312   request.set_message(msg + "0");
1313   EXPECT_TRUE(stream->Write(request));
1314   EXPECT_TRUE(stream->Read(&response));
1315   EXPECT_EQ(response.message(), request.message());
1316 
1317   request.set_message(msg + "1");
1318   EXPECT_TRUE(stream->Write(request));
1319 
1320   context.TryCancel();
1321 
1322   // The cancellation races with responses, so there might be zero or
1323   // one responses pending, read till failure
1324 
1325   if (stream->Read(&response)) {
1326     EXPECT_EQ(response.message(), request.message());
1327     // Since we have cancelled, we expect the next attempt to read to fail
1328     EXPECT_FALSE(stream->Read(&response));
1329   }
1330 
1331   Status s = stream->Finish();
1332   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1333   if (GetParam().use_interceptors()) {
1334     EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1335   }
1336 }
1337 
TEST_P(End2endTest,RpcMaxMessageSize)1338 TEST_P(End2endTest, RpcMaxMessageSize) {
1339   ResetStub();
1340   EchoRequest request;
1341   EchoResponse response;
1342   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1343   request.mutable_param()->set_server_die(true);
1344 
1345   ClientContext context;
1346   Status s = stub_->Echo(&context, request, &response);
1347   EXPECT_FALSE(s.ok());
1348 }
1349 
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1350 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1351                       gpr_event* ev) {
1352   EchoResponse resp;
1353   gpr_event_set(ev, reinterpret_cast<void*>(1));
1354   while (stream->Read(&resp)) {
1355     gpr_log(GPR_INFO, "Read message");
1356   }
1357 }
1358 
1359 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1360 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1361   ResetStub();
1362   ClientContext context;
1363   gpr_event ev;
1364   gpr_event_init(&ev);
1365   auto stream = stub_->BidiStream(&context);
1366   std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1367   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1368   stream->WritesDone();
1369   reader_thread.join();
1370   Status s = stream->Finish();
1371   EXPECT_TRUE(s.ok());
1372 }
1373 
TEST_P(End2endTest,ChannelState)1374 TEST_P(End2endTest, ChannelState) {
1375   if (GetParam().inproc()) {
1376     return;
1377   }
1378 
1379   ResetStub();
1380   // Start IDLE
1381   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1382 
1383   // Did not ask to connect, no state change.
1384   CompletionQueue cq;
1385   std::chrono::system_clock::time_point deadline =
1386       std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1387   channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1388   void* tag;
1389   bool ok = true;
1390   cq.Next(&tag, &ok);
1391   EXPECT_FALSE(ok);
1392 
1393   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1394   EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1395                                            gpr_inf_future(GPR_CLOCK_REALTIME)));
1396   auto state = channel_->GetState(false);
1397   EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1398 }
1399 
1400 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1401 TEST_P(End2endTest, ChannelStateTimeout) {
1402   if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1403       GetParam().inproc()) {
1404     return;
1405   }
1406   int port = grpc_pick_unused_port_or_die();
1407   std::ostringstream server_address;
1408   server_address << "localhost:" << port;
1409   // Channel to non-existing server
1410   auto channel =
1411       grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1412   // Start IDLE
1413   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1414 
1415   auto state = GRPC_CHANNEL_IDLE;
1416   for (int i = 0; i < 10; i++) {
1417     channel->WaitForStateChange(
1418         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1419     state = channel->GetState(false);
1420   }
1421 }
1422 
TEST_P(End2endTest,ChannelStateOnLameChannel)1423 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1424   if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1425       GetParam().inproc()) {
1426     return;
1427   }
1428   // Channel using invalid target URI.  This creates a lame channel.
1429   auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1430   // Channel should immediately report TRANSIENT_FAILURE.
1431   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1432   // And state will never change.
1433   auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1434   for (int i = 0; i < 10; ++i) {
1435     channel->WaitForStateChange(
1436         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1437     state = channel->GetState(false);
1438   }
1439 }
1440 
1441 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1442 TEST_P(End2endTest, NonExistingService) {
1443   ResetChannel();
1444   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1445   stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1446 
1447   EchoRequest request;
1448   EchoResponse response;
1449   request.set_message("Hello");
1450 
1451   ClientContext context;
1452   Status s = stub->Unimplemented(&context, request, &response);
1453   EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1454   EXPECT_EQ("", s.error_message());
1455 }
1456 
1457 // Ask the server to send back a serialized proto in trailer.
1458 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1459 TEST_P(End2endTest, BinaryTrailerTest) {
1460   ResetStub();
1461   EchoRequest request;
1462   EchoResponse response;
1463   ClientContext context;
1464 
1465   request.mutable_param()->set_echo_metadata(true);
1466   DebugInfo* info = request.mutable_param()->mutable_debug_info();
1467   info->add_stack_entries("stack_entry_1");
1468   info->add_stack_entries("stack_entry_2");
1469   info->add_stack_entries("stack_entry_3");
1470   info->set_detail("detailed debug info");
1471   std::string expected_string = info->SerializeAsString();
1472   request.set_message("Hello");
1473 
1474   Status s = stub_->Echo(&context, request, &response);
1475   EXPECT_FALSE(s.ok());
1476   auto trailers = context.GetServerTrailingMetadata();
1477   EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1478   auto iter = trailers.find(kDebugInfoTrailerKey);
1479   EXPECT_EQ(expected_string, iter->second);
1480   // Parse the returned trailer into a DebugInfo proto.
1481   DebugInfo returned_info;
1482   EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1483 }
1484 
TEST_P(End2endTest,ExpectErrorTest)1485 TEST_P(End2endTest, ExpectErrorTest) {
1486   ResetStub();
1487 
1488   std::vector<ErrorStatus> expected_status;
1489   expected_status.emplace_back();
1490   expected_status.back().set_code(13);  // INTERNAL
1491   // No Error message or details
1492 
1493   expected_status.emplace_back();
1494   expected_status.back().set_code(13);  // INTERNAL
1495   expected_status.back().set_error_message("text error message");
1496   expected_status.back().set_binary_error_details("text error details");
1497 
1498   expected_status.emplace_back();
1499   expected_status.back().set_code(13);  // INTERNAL
1500   expected_status.back().set_error_message("text error message");
1501   expected_status.back().set_binary_error_details(
1502       "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1503 
1504   for (auto iter = expected_status.begin(); iter != expected_status.end();
1505        ++iter) {
1506     EchoRequest request;
1507     EchoResponse response;
1508     ClientContext context;
1509     request.set_message("Hello");
1510     auto* error = request.mutable_param()->mutable_expected_error();
1511     error->set_code(iter->code());
1512     error->set_error_message(iter->error_message());
1513     error->set_binary_error_details(iter->binary_error_details());
1514 
1515     Status s = stub_->Echo(&context, request, &response);
1516     EXPECT_FALSE(s.ok());
1517     EXPECT_EQ(iter->code(), s.error_code());
1518     EXPECT_EQ(iter->error_message(), s.error_message());
1519     EXPECT_EQ(iter->binary_error_details(), s.error_details());
1520     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1521 #ifndef NDEBUG
1522     // grpc_core::StatusIntProperty::kFileLine is for debug only
1523     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1524     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1525 #endif
1526     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1527     EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1528   }
1529 }
1530 
1531 //////////////////////////////////////////////////////////////////////////
1532 // Test with and without a proxy.
1533 class ProxyEnd2endTest : public End2endTest {
1534  protected:
1535 };
1536 
TEST_P(ProxyEnd2endTest,SimpleRpc)1537 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1538   ResetStub();
1539   SendRpc(stub_.get(), 1, false);
1540 }
1541 
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1542 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1543   ResetStub();
1544   EchoRequest request;
1545   EchoResponse response;
1546 
1547   ClientContext context;
1548   Status s = stub_->Echo(&context, request, &response);
1549   EXPECT_TRUE(s.ok());
1550 }
1551 
TEST_P(ProxyEnd2endTest,MultipleRpcs)1552 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1553   ResetStub();
1554   std::vector<std::thread> threads;
1555   threads.reserve(10);
1556   for (int i = 0; i < 10; ++i) {
1557     threads.emplace_back(SendRpc, stub_.get(), 10, false);
1558   }
1559   for (int i = 0; i < 10; ++i) {
1560     threads[i].join();
1561   }
1562 }
1563 
1564 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1565 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1566   ResetStub();
1567   EchoRequest request;
1568   EchoResponse response;
1569   request.set_message("Hello");
1570   request.mutable_param()->set_skip_cancelled_check(true);
1571   // Let server sleep for 4 secs first to guarantee expiry.
1572   // 4 secs might seem a bit extreme but the timer manager would have been just
1573   // initialized (when ResetStub() was called) and there are some warmup costs
1574   // i.e the timer thread many not have even started. There might also be other
1575   // delays in the timer manager thread (in acquiring locks, timer data
1576   // structure manipulations, starting backup timer threads) that add to the
1577   // delays. 4 secs might be still not enough in some cases but this
1578   // significantly reduces the test flakes
1579   request.mutable_param()->set_server_sleep_us(4 * 1000 * 1000);
1580 
1581   ClientContext context;
1582   std::chrono::system_clock::time_point deadline =
1583       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1584   context.set_deadline(deadline);
1585   Status s = stub_->Echo(&context, request, &response);
1586   EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1587 }
1588 
1589 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1590 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1591   ResetStub();
1592   EchoRequest request;
1593   EchoResponse response;
1594   request.set_message("Hello");
1595 
1596   ClientContext context;
1597   std::chrono::system_clock::time_point deadline =
1598       std::chrono::system_clock::now() + std::chrono::hours(1);
1599   context.set_deadline(deadline);
1600   Status s = stub_->Echo(&context, request, &response);
1601   EXPECT_EQ(response.message(), request.message());
1602   EXPECT_TRUE(s.ok());
1603 }
1604 
1605 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1606 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1607   ResetStub();
1608   EchoRequest request;
1609   EchoResponse response;
1610   request.set_message("Hello");
1611   request.mutable_param()->set_echo_deadline(true);
1612 
1613   ClientContext context;
1614   std::chrono::system_clock::time_point deadline =
1615       std::chrono::system_clock::now() + std::chrono::seconds(100);
1616   context.set_deadline(deadline);
1617   Status s = stub_->Echo(&context, request, &response);
1618   EXPECT_EQ(response.message(), request.message());
1619   EXPECT_TRUE(s.ok());
1620   gpr_timespec sent_deadline;
1621   Timepoint2Timespec(deadline, &sent_deadline);
1622   // We want to allow some reasonable error given:
1623   // - request_deadline() only has 1sec resolution so the best we can do is +-1
1624   // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1625   // can end up being off by 2 in one direction.
1626   EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1627   EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1628 }
1629 
1630 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1631 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1632   ResetStub();
1633   EchoRequest request;
1634   EchoResponse response;
1635   request.set_message("Hello");
1636   request.mutable_param()->set_echo_deadline(true);
1637 
1638   ClientContext context;
1639   Status s = stub_->Echo(&context, request, &response);
1640   EXPECT_EQ(response.message(), request.message());
1641   EXPECT_TRUE(s.ok());
1642   EXPECT_EQ(response.param().request_deadline(),
1643             gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1644 }
1645 
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1646 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1647   ResetStub();
1648   EchoRequest request;
1649   EchoResponse response;
1650   request.set_message("Hello");
1651 
1652   ClientContext context;
1653   Status s = stub_->Unimplemented(&context, request, &response);
1654   EXPECT_FALSE(s.ok());
1655   EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1656   EXPECT_EQ(s.error_message(), "");
1657   EXPECT_EQ(response.message(), "");
1658 }
1659 
1660 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1661 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1662   ResetStub();
1663   EchoRequest request;
1664   EchoResponse response;
1665   request.set_message("Hello");
1666   const int kCancelDelayUs = 10 * 1000;
1667   request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1668 
1669   ClientContext context;
1670   std::thread cancel_thread;
1671   if (!GetParam().callback_server()) {
1672     cancel_thread = std::thread(
1673         [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1674         kCancelDelayUs);
1675     // Note: the unusual pattern above (and below) is caused by a conflict
1676     // between two sets of compiler expectations. clang allows const to be
1677     // captured without mention, so there is no need to capture kCancelDelayUs
1678     // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1679     // in our tests requires an explicit capture even for const. We square this
1680     // circle by passing the const value in as an argument to the lambda.
1681   } else {
1682     cancel_thread = std::thread(
1683         [&context, this](int delay) {
1684           CancelRpc(&context, delay, &callback_service_);
1685         },
1686         kCancelDelayUs);
1687   }
1688   Status s = stub_->Echo(&context, request, &response);
1689   cancel_thread.join();
1690   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1691   EXPECT_EQ(s.error_message(), "CANCELLED");
1692 }
1693 
1694 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1695 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1696   ResetStub();
1697   EchoRequest request;
1698   EchoResponse response;
1699   request.set_message("Hello");
1700   request.mutable_param()->set_server_cancel_after_us(1000);
1701 
1702   ClientContext context;
1703   Status s = stub_->Echo(&context, request, &response);
1704   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1705   EXPECT_TRUE(s.error_message().empty());
1706 }
1707 
1708 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1709 TEST_P(ProxyEnd2endTest, HugeResponse) {
1710   ResetStub();
1711   EchoRequest request;
1712   EchoResponse response;
1713   request.set_message("huge response");
1714   const size_t kResponseSize = 1024 * (1024 + 10);
1715   request.mutable_param()->set_response_message_length(kResponseSize);
1716 
1717   ClientContext context;
1718   std::chrono::system_clock::time_point deadline =
1719       std::chrono::system_clock::now() + std::chrono::seconds(20);
1720   context.set_deadline(deadline);
1721   Status s = stub_->Echo(&context, request, &response);
1722   EXPECT_EQ(kResponseSize, response.message().size());
1723   EXPECT_TRUE(s.ok());
1724 }
1725 
TEST_P(ProxyEnd2endTest,Peer)1726 TEST_P(ProxyEnd2endTest, Peer) {
1727   // Peer is not meaningful for inproc
1728   if (GetParam().inproc()) {
1729     return;
1730   }
1731   ResetStub();
1732   EchoRequest request;
1733   EchoResponse response;
1734   request.set_message("hello");
1735   request.mutable_param()->set_echo_peer(true);
1736 
1737   ClientContext context;
1738   Status s = stub_->Echo(&context, request, &response);
1739   EXPECT_EQ(response.message(), request.message());
1740   EXPECT_TRUE(s.ok());
1741   EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1742   EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1743 }
1744 
1745 //////////////////////////////////////////////////////////////////////////
1746 class SecureEnd2endTest : public End2endTest {
1747  protected:
SecureEnd2endTest()1748   SecureEnd2endTest() {
1749     GPR_ASSERT(!GetParam().use_proxy());
1750     GPR_ASSERT(GetParam().credentials_type() != kInsecureCredentialsType);
1751   }
1752 };
1753 
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1754 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1755   ResetStub();
1756 
1757   EchoRequest request;
1758   EchoResponse response;
1759   request.set_message("Hello");
1760 
1761   ClientContext context;
1762   context.set_authority("foo.test.youtube.com");
1763   Status s = stub_->Echo(&context, request, &response);
1764   EXPECT_EQ(response.message(), request.message());
1765   EXPECT_TRUE(response.has_param());
1766   EXPECT_EQ("special", response.param().host());
1767   EXPECT_TRUE(s.ok());
1768 }
1769 
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1770 bool MetadataContains(
1771     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1772     const std::string& key, const std::string& value) {
1773   int count = 0;
1774 
1775   for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1776            metadata.begin();
1777        iter != metadata.end(); ++iter) {
1778     if (ToString(iter->first) == key && ToString(iter->second) == value) {
1779       count++;
1780     }
1781   }
1782   return count == 1;
1783 }
1784 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1785 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1786   auto* processor = new TestAuthMetadataProcessor(true);
1787   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1788   ResetStub();
1789   EchoRequest request;
1790   EchoResponse response;
1791   ClientContext context;
1792   context.set_credentials(processor->GetCompatibleClientCreds());
1793   request.set_message("Hello");
1794   request.mutable_param()->set_echo_metadata(true);
1795   request.mutable_param()->set_expected_client_identity(
1796       TestAuthMetadataProcessor::kGoodGuy);
1797   request.mutable_param()->set_expected_transport_security_type(
1798       GetParam().credentials_type());
1799 
1800   Status s = stub_->Echo(&context, request, &response);
1801   EXPECT_EQ(request.message(), response.message());
1802   EXPECT_TRUE(s.ok());
1803 
1804   // Metadata should have been consumed by the processor.
1805   EXPECT_FALSE(MetadataContains(
1806       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1807       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1808 }
1809 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1810 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1811   auto* processor = new TestAuthMetadataProcessor(true);
1812   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1813   ResetStub();
1814   EchoRequest request;
1815   EchoResponse response;
1816   ClientContext context;
1817   context.set_credentials(processor->GetIncompatibleClientCreds());
1818   request.set_message("Hello");
1819 
1820   Status s = stub_->Echo(&context, request, &response);
1821   EXPECT_FALSE(s.ok());
1822   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1823 }
1824 
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1825 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1826   ResetStub();
1827   EchoRequest request;
1828   EchoResponse response;
1829   ClientContext context;
1830   std::shared_ptr<CallCredentials> creds =
1831       GoogleIAMCredentials(kFakeToken, kFakeSelector);
1832   context.set_credentials(creds);
1833   request.set_message("Hello");
1834   request.mutable_param()->set_echo_metadata(true);
1835 
1836   Status s = stub_->Echo(&context, request, &response);
1837   EXPECT_EQ(request.message(), response.message());
1838   EXPECT_TRUE(s.ok());
1839   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1840                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1841                                kFakeToken));
1842   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1843                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1844                                kFakeSelector));
1845   EXPECT_EQ(context.credentials()->DebugString(),
1846             kExpectedFakeCredsDebugString);
1847 }
1848 
1849 class CredentialsInterceptor : public experimental::Interceptor {
1850  public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1851   explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1852       : info_(info) {}
1853 
Intercept(experimental::InterceptorBatchMethods * methods)1854   void Intercept(experimental::InterceptorBatchMethods* methods) override {
1855     if (methods->QueryInterceptionHookPoint(
1856             experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1857       std::shared_ptr<CallCredentials> creds =
1858           GoogleIAMCredentials(kFakeToken, kFakeSelector);
1859       info_->client_context()->set_credentials(creds);
1860     }
1861     methods->Proceed();
1862   }
1863 
1864  private:
1865   experimental::ClientRpcInfo* info_ = nullptr;
1866 };
1867 
1868 class CredentialsInterceptorFactory
1869     : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1870   CredentialsInterceptor* CreateClientInterceptor(
1871       experimental::ClientRpcInfo* info) override {
1872     return new CredentialsInterceptor(info);
1873   }
1874 };
1875 
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1876 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1877   if (!GetParam().use_interceptors()) {
1878     return;
1879   }
1880   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1881       interceptor_creators;
1882   interceptor_creators.push_back(
1883       std::make_unique<CredentialsInterceptorFactory>());
1884   ResetStub(std::move(interceptor_creators));
1885   EchoRequest request;
1886   EchoResponse response;
1887   ClientContext context;
1888 
1889   request.set_message("Hello");
1890   request.mutable_param()->set_echo_metadata(true);
1891 
1892   Status s = stub_->Echo(&context, request, &response);
1893   EXPECT_EQ(request.message(), response.message());
1894   EXPECT_TRUE(s.ok());
1895   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1897                                kFakeToken));
1898   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1899                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1900                                kFakeSelector));
1901   EXPECT_EQ(context.credentials()->DebugString(),
1902             kExpectedFakeCredsDebugString);
1903 }
1904 
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1905 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1906   if (!GetParam().use_interceptors()) {
1907     return;
1908   }
1909   std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1910       interceptor_creators;
1911   interceptor_creators.push_back(
1912       std::make_unique<CredentialsInterceptorFactory>());
1913   ResetStub(std::move(interceptor_creators));
1914   EchoRequest request;
1915   EchoResponse response;
1916   ClientContext context;
1917   std::shared_ptr<CallCredentials> creds1 =
1918       GoogleIAMCredentials(kWrongToken, kWrongSelector);
1919   context.set_credentials(creds1);
1920   EXPECT_EQ(context.credentials(), creds1);
1921   EXPECT_EQ(context.credentials()->DebugString(),
1922             kExpectedWrongCredsDebugString);
1923   request.set_message("Hello");
1924   request.mutable_param()->set_echo_metadata(true);
1925 
1926   Status s = stub_->Echo(&context, request, &response);
1927   EXPECT_EQ(request.message(), response.message());
1928   EXPECT_TRUE(s.ok());
1929   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1930                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1931                                kFakeToken));
1932   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1933                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1934                                kFakeSelector));
1935   EXPECT_EQ(context.credentials()->DebugString(),
1936             kExpectedFakeCredsDebugString);
1937 }
1938 
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1939 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1940   ResetStub();
1941   EchoRequest request;
1942   EchoResponse response;
1943   ClientContext context;
1944   std::shared_ptr<CallCredentials> creds1 =
1945       GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1946   context.set_credentials(creds1);
1947   EXPECT_EQ(context.credentials(), creds1);
1948   EXPECT_EQ(context.credentials()->DebugString(),
1949             kExpectedFakeCreds1DebugString);
1950   std::shared_ptr<CallCredentials> creds2 =
1951       GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1952   context.set_credentials(creds2);
1953   EXPECT_EQ(context.credentials(), creds2);
1954   request.set_message("Hello");
1955   request.mutable_param()->set_echo_metadata(true);
1956 
1957   Status s = stub_->Echo(&context, request, &response);
1958   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1959                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1960                                kFakeToken2));
1961   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1962                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1963                                kFakeSelector2));
1964   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1965                                 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1966                                 kFakeToken1));
1967   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1968                                 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1969                                 kFakeSelector1));
1970   EXPECT_EQ(context.credentials()->DebugString(),
1971             kExpectedFakeCreds2DebugString);
1972   EXPECT_EQ(request.message(), response.message());
1973   EXPECT_TRUE(s.ok());
1974 }
1975 
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1976 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1977   ResetStub();
1978   EchoRequest request;
1979   EchoResponse response;
1980   ClientContext context;
1981   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1982       std::unique_ptr<MetadataCredentialsPlugin>(
1983           new TestMetadataCredentialsPlugin(
1984               TestMetadataCredentialsPlugin::kBadMetadataKey,
1985               "Does not matter, will fail the key is invalid.", false, true,
1986               0))));
1987   request.set_message("Hello");
1988 
1989   Status s = stub_->Echo(&context, request, &response);
1990   EXPECT_FALSE(s.ok());
1991   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1992   EXPECT_EQ(context.credentials()->DebugString(),
1993             kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1994 }
1995 
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1996 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1997   ResetStub();
1998   EchoRequest request;
1999   EchoResponse response;
2000   ClientContext context;
2001   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2002       std::unique_ptr<MetadataCredentialsPlugin>(
2003           new TestMetadataCredentialsPlugin(
2004               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2005               "With illegal \n value.", false, true, 0))));
2006   request.set_message("Hello");
2007 
2008   Status s = stub_->Echo(&context, request, &response);
2009   EXPECT_FALSE(s.ok());
2010   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2011   EXPECT_EQ(context.credentials()->DebugString(),
2012             kExpectedAuthMetadataPluginValueFailureCredsDebugString);
2013 }
2014 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)2015 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
2016   ResetStub();
2017   EchoRequest request;
2018   request.mutable_param()->set_skip_cancelled_check(true);
2019   EchoResponse response;
2020   ClientContext context;
2021   const int delay = 100;
2022   std::chrono::system_clock::time_point deadline =
2023       std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
2024   context.set_deadline(deadline);
2025   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2026       std::unique_ptr<MetadataCredentialsPlugin>(
2027           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2028                                             true, delay))));
2029   request.set_message("Hello");
2030 
2031   Status s = stub_->Echo(&context, request, &response);
2032   if (!s.ok()) {
2033     EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
2034                 s.error_code() == StatusCode::UNAVAILABLE);
2035   }
2036   EXPECT_EQ(context.credentials()->DebugString(),
2037             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2038 }
2039 
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2040 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2041   ResetStub();
2042   EchoRequest request;
2043   request.mutable_param()->set_skip_cancelled_check(true);
2044   EchoResponse response;
2045   ClientContext context;
2046   const int delay = 100;
2047   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2048       std::unique_ptr<MetadataCredentialsPlugin>(
2049           new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2050                                             true, delay))));
2051   request.set_message("Hello");
2052 
2053   std::thread cancel_thread([&] {
2054     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2055                                  gpr_time_from_millis(delay, GPR_TIMESPAN)));
2056     context.TryCancel();
2057   });
2058   Status s = stub_->Echo(&context, request, &response);
2059   if (!s.ok()) {
2060     EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2061                 s.error_code() == StatusCode::UNAVAILABLE);
2062   }
2063   cancel_thread.join();
2064   EXPECT_EQ(context.credentials()->DebugString(),
2065             kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2066 }
2067 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2068 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2069   ResetStub();
2070   EchoRequest request;
2071   EchoResponse response;
2072   ClientContext context;
2073   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2074       std::unique_ptr<MetadataCredentialsPlugin>(
2075           new TestMetadataCredentialsPlugin(
2076               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2077               "Does not matter, will fail anyway (see 3rd param)", false, false,
2078               0))));
2079   request.set_message("Hello");
2080 
2081   Status s = stub_->Echo(&context, request, &response);
2082   EXPECT_FALSE(s.ok());
2083   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2084   EXPECT_EQ(s.error_message(),
2085             std::string("Getting metadata from plugin failed with error: ") +
2086                 kTestCredsPluginErrorMsg);
2087   EXPECT_EQ(context.credentials()->DebugString(),
2088             kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2089 }
2090 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2091 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2092   auto* processor = new TestAuthMetadataProcessor(false);
2093   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2094   ResetStub();
2095   EchoRequest request;
2096   EchoResponse response;
2097   ClientContext context;
2098   context.set_credentials(processor->GetCompatibleClientCreds());
2099   request.set_message("Hello");
2100   request.mutable_param()->set_echo_metadata(true);
2101   request.mutable_param()->set_expected_client_identity(
2102       TestAuthMetadataProcessor::kGoodGuy);
2103   request.mutable_param()->set_expected_transport_security_type(
2104       GetParam().credentials_type());
2105 
2106   Status s = stub_->Echo(&context, request, &response);
2107   EXPECT_EQ(request.message(), response.message());
2108   EXPECT_TRUE(s.ok());
2109 
2110   // Metadata should have been consumed by the processor.
2111   EXPECT_FALSE(MetadataContains(
2112       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2113       std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2114   EXPECT_EQ(
2115       context.credentials()->DebugString(),
2116       kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2117 }
2118 
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2119 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2120   auto* processor = new TestAuthMetadataProcessor(false);
2121   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2122   ResetStub();
2123   EchoRequest request;
2124   EchoResponse response;
2125   ClientContext context;
2126   context.set_credentials(processor->GetIncompatibleClientCreds());
2127   request.set_message("Hello");
2128 
2129   Status s = stub_->Echo(&context, request, &response);
2130   EXPECT_FALSE(s.ok());
2131   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2132   EXPECT_EQ(
2133       context.credentials()->DebugString(),
2134       kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2135 }
2136 
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2137 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2138   ResetStub();
2139   EchoRequest request;
2140   EchoResponse response;
2141   ClientContext context;
2142   context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2143       std::unique_ptr<MetadataCredentialsPlugin>(
2144           new TestMetadataCredentialsPlugin(
2145               TestMetadataCredentialsPlugin::kGoodMetadataKey,
2146               "Does not matter, will fail anyway (see 3rd param)", true, false,
2147               0))));
2148   request.set_message("Hello");
2149 
2150   Status s = stub_->Echo(&context, request, &response);
2151   EXPECT_FALSE(s.ok());
2152   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2153   EXPECT_EQ(s.error_message(),
2154             std::string("Getting metadata from plugin failed with error: ") +
2155                 kTestCredsPluginErrorMsg);
2156   EXPECT_EQ(context.credentials()->DebugString(),
2157             kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2158 }
2159 
TEST_P(SecureEnd2endTest,CompositeCallCreds)2160 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2161   ResetStub();
2162   EchoRequest request;
2163   EchoResponse response;
2164   ClientContext context;
2165   const char kMetadataKey1[] = "call-creds-key1";
2166   const char kMetadataKey2[] = "call-creds-key2";
2167   const char kMetadataVal1[] = "call-creds-val1";
2168   const char kMetadataVal2[] = "call-creds-val2";
2169 
2170   context.set_credentials(grpc::CompositeCallCredentials(
2171       grpc::MetadataCredentialsFromPlugin(
2172           std::unique_ptr<MetadataCredentialsPlugin>(
2173               new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2174                                                 true, true, 0))),
2175       grpc::MetadataCredentialsFromPlugin(
2176           std::unique_ptr<MetadataCredentialsPlugin>(
2177               new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2178                                                 true, true, 0)))));
2179   request.set_message("Hello");
2180   request.mutable_param()->set_echo_metadata(true);
2181 
2182   Status s = stub_->Echo(&context, request, &response);
2183   EXPECT_TRUE(s.ok());
2184   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2185                                kMetadataKey1, kMetadataVal1));
2186   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2187                                kMetadataKey2, kMetadataVal2));
2188   EXPECT_EQ(context.credentials()->DebugString(),
2189             kExpectedCompositeCallCredsDebugString);
2190 }
2191 
TEST_P(SecureEnd2endTest,ClientAuthContext)2192 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2193   ResetStub();
2194   EchoRequest request;
2195   EchoResponse response;
2196   request.set_message("Hello");
2197   request.mutable_param()->set_check_auth_context(
2198       GetParam().credentials_type() == kTlsCredentialsType);
2199   request.mutable_param()->set_expected_transport_security_type(
2200       GetParam().credentials_type());
2201   ClientContext context;
2202   Status s = stub_->Echo(&context, request, &response);
2203   EXPECT_EQ(response.message(), request.message());
2204   EXPECT_TRUE(s.ok());
2205 
2206   std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2207   std::vector<grpc::string_ref> tst =
2208       auth_ctx->FindPropertyValues("transport_security_type");
2209   ASSERT_EQ(1u, tst.size());
2210   EXPECT_EQ(GetParam().credentials_type(), ToString(tst[0]));
2211   if (GetParam().credentials_type() == kTlsCredentialsType) {
2212     EXPECT_EQ("x509_subject_alternative_name",
2213               auth_ctx->GetPeerIdentityPropertyName());
2214     EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2215     EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2216     EXPECT_EQ("waterzooi.test.google.be",
2217               ToString(auth_ctx->GetPeerIdentity()[1]));
2218     EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2219     EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2220   }
2221 }
2222 
2223 class ResourceQuotaEnd2endTest : public End2endTest {
2224  public:
ResourceQuotaEnd2endTest()2225   ResourceQuotaEnd2endTest()
2226       : server_resource_quota_("server_resource_quota") {}
2227 
ConfigureServerBuilder(ServerBuilder * builder)2228   void ConfigureServerBuilder(ServerBuilder* builder) override {
2229     builder->SetResourceQuota(server_resource_quota_);
2230   }
2231 
2232  private:
2233   ResourceQuota server_resource_quota_;
2234 };
2235 
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2236 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2237   ResetStub();
2238 
2239   EchoRequest request;
2240   EchoResponse response;
2241   request.set_message("Hello");
2242 
2243   ClientContext context;
2244   Status s = stub_->Echo(&context, request, &response);
2245   EXPECT_EQ(response.message(), request.message());
2246   EXPECT_TRUE(s.ok());
2247 }
2248 
2249 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2250 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2251                                               bool test_insecure,
2252                                               bool test_secure,
2253                                               bool test_inproc,
2254                                               bool test_callback_server) {
2255   std::vector<TestScenario> scenarios;
2256   std::vector<std::string> credentials_types;
2257 
2258   grpc_core::ConfigVars::Overrides overrides;
2259   overrides.client_channel_backup_poll_interval_ms =
2260       kClientChannelBackupPollIntervalMs;
2261   grpc_core::ConfigVars::SetOverrides(overrides);
2262 #if TARGET_OS_IPHONE
2263   // Workaround Apple CFStream bug
2264   grpc_core::SetEnv("grpc_cfstream", "0");
2265 #endif
2266 
2267   if (test_secure) {
2268     credentials_types =
2269         GetCredentialsProvider()->GetSecureCredentialsTypeList();
2270   }
2271   auto insec_ok = [] {
2272     // Only allow insecure credentials type when it is registered with the
2273     // provider. User may create providers that do not have insecure.
2274     return GetCredentialsProvider()->GetChannelCredentials(
2275                kInsecureCredentialsType, nullptr) != nullptr;
2276   };
2277   if (test_insecure && insec_ok()) {
2278     credentials_types.push_back(kInsecureCredentialsType);
2279   }
2280 
2281   // Test callback with inproc or if the event-engine allows it
2282   GPR_ASSERT(!credentials_types.empty());
2283   for (const auto& cred : credentials_types) {
2284     scenarios.emplace_back(false, false, false, cred, false);
2285     scenarios.emplace_back(true, false, false, cred, false);
2286     if (test_callback_server) {
2287       // Note that these scenarios will be dynamically disabled if the event
2288       // engine doesn't run in the background
2289       scenarios.emplace_back(false, false, false, cred, true);
2290       scenarios.emplace_back(true, false, false, cred, true);
2291     }
2292     if (use_proxy) {
2293       scenarios.emplace_back(false, true, false, cred, false);
2294       scenarios.emplace_back(true, true, false, cred, false);
2295     }
2296   }
2297   if (test_inproc && insec_ok()) {
2298     scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2299     scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2300     if (test_callback_server) {
2301       scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2302                              true);
2303       scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2304     }
2305   }
2306   return scenarios;
2307 }
2308 
2309 INSTANTIATE_TEST_SUITE_P(
2310     End2end, End2endTest,
2311     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2312     &TestScenario::Name);
2313 
2314 INSTANTIATE_TEST_SUITE_P(
2315     End2endServerTryCancel, End2endServerTryCancelTest,
2316     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2317     &TestScenario::Name);
2318 
2319 INSTANTIATE_TEST_SUITE_P(
2320     ProxyEnd2end, ProxyEnd2endTest,
2321     ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)),
2322     &TestScenario::Name);
2323 
2324 INSTANTIATE_TEST_SUITE_P(
2325     SecureEnd2end, SecureEnd2endTest,
2326     ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)),
2327     &TestScenario::Name);
2328 
2329 INSTANTIATE_TEST_SUITE_P(
2330     ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2331     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2332     &TestScenario::Name);
2333 
2334 }  // namespace
2335 }  // namespace testing
2336 }  // namespace grpc
2337 
main(int argc,char ** argv)2338 int main(int argc, char** argv) {
2339   grpc::testing::TestEnvironment env(&argc, argv);
2340   ::testing::InitGoogleTest(&argc, argv);
2341   int ret = RUN_ALL_TESTS();
2342   return ret;
2343 }
2344