1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <mutex>
20 #include <thread>
21
22 #include "absl/memory/memory.h"
23 #include "absl/strings/ascii.h"
24 #include "absl/strings/match.h"
25 #include "absl/strings/str_format.h"
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/resource_quota.h>
35 #include <grpcpp/security/auth_metadata_processor.h>
36 #include <grpcpp/security/credentials.h>
37 #include <grpcpp/security/server_credentials.h>
38 #include <grpcpp/server.h>
39 #include <grpcpp/server_builder.h>
40 #include <grpcpp/server_context.h>
41 #include <grpcpp/support/string_ref.h>
42 #include <grpcpp/test/channel_test_peer.h>
43
44 #include "src/core/client_channel/backup_poller.h"
45 #include "src/core/lib/config/config_vars.h"
46 #include "src/core/lib/gprpp/crash.h"
47 #include "src/core/lib/gprpp/env.h"
48 #include "src/core/lib/iomgr/iomgr.h"
49 #include "src/core/lib/security/credentials/credentials.h"
50 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
51 #include "src/proto/grpc/testing/echo.grpc.pb.h"
52 #include "test/core/util/port.h"
53 #include "test/core/util/test_config.h"
54 #include "test/cpp/end2end/interceptors_util.h"
55 #include "test/cpp/end2end/test_service_impl.h"
56 #include "test/cpp/util/string_ref_helper.h"
57 #include "test/cpp/util/test_credentials_provider.h"
58
59 #ifdef GRPC_POSIX_SOCKET_EV
60 #include "src/core/lib/iomgr/ev_posix.h"
61 #endif // GRPC_POSIX_SOCKET_EV
62
63 #include <gtest/gtest.h>
64
65 using std::chrono::system_clock;
66
67 namespace grpc {
68 namespace testing {
69 namespace {
70
CheckIsLocalhost(const std::string & addr)71 bool CheckIsLocalhost(const std::string& addr) {
72 const std::string kIpv6("ipv6:%5B::1%5D:");
73 const std::string kIpv4MappedIpv6("ipv6:%5B::ffff:127.0.0.1%5D:");
74 const std::string kIpv4("ipv4:127.0.0.1:");
75 return addr.substr(0, kIpv4.size()) == kIpv4 ||
76 addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77 addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79
80 const int kClientChannelBackupPollIntervalMs = 200;
81
82 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
83
84 const char kFakeToken[] = "fake_token";
85 const char kFakeSelector[] = "fake_selector";
86 const char kExpectedFakeCredsDebugString[] =
87 "CallCredentials{GoogleIAMCredentials{Token:present,"
88 "AuthoritySelector:fake_selector}}";
89
90 const char kWrongToken[] = "wrong_token";
91 const char kWrongSelector[] = "wrong_selector";
92 const char kExpectedWrongCredsDebugString[] =
93 "CallCredentials{GoogleIAMCredentials{Token:present,"
94 "AuthoritySelector:wrong_selector}}";
95
96 const char kFakeToken1[] = "fake_token1";
97 const char kFakeSelector1[] = "fake_selector1";
98 const char kExpectedFakeCreds1DebugString[] =
99 "CallCredentials{GoogleIAMCredentials{Token:present,"
100 "AuthoritySelector:fake_selector1}}";
101
102 const char kFakeToken2[] = "fake_token2";
103 const char kFakeSelector2[] = "fake_selector2";
104 const char kExpectedFakeCreds2DebugString[] =
105 "CallCredentials{GoogleIAMCredentials{Token:present,"
106 "AuthoritySelector:fake_selector2}}";
107
108 const char kExpectedAuthMetadataPluginKeyFailureCredsDebugString[] =
109 "CallCredentials{TestMetadataCredentials{key:TestPluginMetadata,"
110 "value:Does not matter, will fail the key is invalid.}}";
111 const char kExpectedAuthMetadataPluginValueFailureCredsDebugString[] =
112 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
113 "value:With illegal \n value.}}";
114 const char kExpectedAuthMetadataPluginWithDeadlineCredsDebugString[] =
115 "CallCredentials{TestMetadataCredentials{key:meta_key,value:Does "
116 "not "
117 "matter}}";
118 const char kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString[] =
119 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
120 "value:Does not matter, will fail anyway (see 3rd param)}}";
121 const char
122 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString
123 [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
124 "metadata,value:Dr Jekyll}}";
125 const char
126 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString
127 [] = "CallCredentials{TestMetadataCredentials{key:test-plugin-"
128 "metadata,value:Mr Hyde}}";
129 const char kExpectedBlockingAuthMetadataPluginFailureCredsDebugString[] =
130 "CallCredentials{TestMetadataCredentials{key:test-plugin-metadata,"
131 "value:Does not matter, will fail anyway (see 3rd param)}}";
132 const char kExpectedCompositeCallCredsDebugString[] =
133 "CallCredentials{CompositeCallCredentials{TestMetadataCredentials{"
134 "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:"
135 "call-creds-key2,value:call-creds-val2}}}";
136
137 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
138 public:
139 static const char kGoodMetadataKey[];
140 static const char kBadMetadataKey[];
141
TestMetadataCredentialsPlugin(const grpc::string_ref & metadata_key,const grpc::string_ref & metadata_value,bool is_blocking,bool is_successful,int delay_ms)142 TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
143 const grpc::string_ref& metadata_value,
144 bool is_blocking, bool is_successful,
145 int delay_ms)
146 : metadata_key_(metadata_key.data(), metadata_key.length()),
147 metadata_value_(metadata_value.data(), metadata_value.length()),
148 is_blocking_(is_blocking),
149 is_successful_(is_successful),
150 delay_ms_(delay_ms) {}
151
IsBlocking() const152 bool IsBlocking() const override { return is_blocking_; }
153
GetMetadata(grpc::string_ref service_url,grpc::string_ref method_name,const grpc::AuthContext & channel_auth_context,std::multimap<std::string,std::string> * metadata)154 Status GetMetadata(
155 grpc::string_ref service_url, grpc::string_ref method_name,
156 const grpc::AuthContext& channel_auth_context,
157 std::multimap<std::string, std::string>* metadata) override {
158 if (delay_ms_ != 0) {
159 gpr_sleep_until(
160 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
161 gpr_time_from_millis(delay_ms_, GPR_TIMESPAN)));
162 }
163 EXPECT_GT(service_url.length(), 0UL);
164 EXPECT_GT(method_name.length(), 0UL);
165 EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
166 EXPECT_TRUE(metadata != nullptr);
167 if (is_successful_) {
168 metadata->insert(std::make_pair(metadata_key_, metadata_value_));
169 return Status::OK;
170 } else {
171 return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
172 }
173 }
174
DebugString()175 std::string DebugString() override {
176 return absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}",
177 metadata_key_.c_str(), metadata_value_.c_str());
178 }
179
180 private:
181 std::string metadata_key_;
182 std::string metadata_value_;
183 bool is_blocking_;
184 bool is_successful_;
185 int delay_ms_;
186 };
187
188 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
189 "TestPluginMetadata";
190 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
191 "test-plugin-metadata";
192
193 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
194 public:
195 static const char kGoodGuy[];
196
TestAuthMetadataProcessor(bool is_blocking)197 explicit TestAuthMetadataProcessor(bool is_blocking)
198 : is_blocking_(is_blocking) {}
199
GetCompatibleClientCreds()200 std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
201 return grpc::MetadataCredentialsFromPlugin(
202 std::unique_ptr<MetadataCredentialsPlugin>(
203 new TestMetadataCredentialsPlugin(
204 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
205 is_blocking_, true, 0)));
206 }
207
GetIncompatibleClientCreds()208 std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
209 return grpc::MetadataCredentialsFromPlugin(
210 std::unique_ptr<MetadataCredentialsPlugin>(
211 new TestMetadataCredentialsPlugin(
212 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
213 is_blocking_, true, 0)));
214 }
215
216 // Interface implementation
IsBlocking() const217 bool IsBlocking() const override { return is_blocking_; }
218
Process(const InputMetadata & auth_metadata,AuthContext * context,OutputMetadata * consumed_auth_metadata,OutputMetadata * response_metadata)219 Status Process(const InputMetadata& auth_metadata, AuthContext* context,
220 OutputMetadata* consumed_auth_metadata,
221 OutputMetadata* response_metadata) override {
222 EXPECT_TRUE(consumed_auth_metadata != nullptr);
223 EXPECT_TRUE(context != nullptr);
224 EXPECT_TRUE(response_metadata != nullptr);
225 auto auth_md =
226 auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
227 EXPECT_NE(auth_md, auth_metadata.end());
228 string_ref auth_md_value = auth_md->second;
229 if (auth_md_value == kGoodGuy) {
230 context->AddProperty(kIdentityPropName, kGoodGuy);
231 context->SetPeerIdentityPropertyName(kIdentityPropName);
232 consumed_auth_metadata->insert(std::make_pair(
233 string(auth_md->first.data(), auth_md->first.length()),
234 string(auth_md->second.data(), auth_md->second.length())));
235 return Status::OK;
236 } else {
237 return Status(StatusCode::UNAUTHENTICATED,
238 string("Invalid principal: ") +
239 string(auth_md_value.data(), auth_md_value.length()));
240 }
241 }
242
243 private:
244 static const char kIdentityPropName[];
245 bool is_blocking_;
246 };
247
248 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
249 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
250
251 class Proxy : public grpc::testing::EchoTestService::Service {
252 public:
Proxy(const std::shared_ptr<Channel> & channel)253 explicit Proxy(const std::shared_ptr<Channel>& channel)
254 : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
255
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)256 Status Echo(ServerContext* server_context, const EchoRequest* request,
257 EchoResponse* response) override {
258 std::unique_ptr<ClientContext> client_context =
259 ClientContext::FromServerContext(*server_context);
260 return stub_->Echo(client_context.get(), *request, response);
261 }
262
263 private:
264 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
265 };
266
267 class TestServiceImplDupPkg
268 : public grpc::testing::duplicate::EchoTestService::Service {
269 public:
Echo(ServerContext *,const EchoRequest *,EchoResponse * response)270 Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/,
271 EchoResponse* response) override {
272 response->set_message("no package");
273 return Status::OK;
274 }
275 };
276
277 class TestScenario {
278 public:
TestScenario(bool use_interceptors,bool use_proxy,bool inproc,const std::string & credentials_type,bool callback_server)279 TestScenario(bool use_interceptors, bool use_proxy, bool inproc,
280 const std::string& credentials_type, bool callback_server)
281 : use_interceptors_(use_interceptors),
282 use_proxy_(use_proxy),
283 inproc_(inproc),
284 credentials_type_(credentials_type),
285 callback_server_(callback_server) {}
286
use_interceptors() const287 bool use_interceptors() const { return use_interceptors_; }
use_proxy() const288 bool use_proxy() const { return use_proxy_; }
inproc() const289 bool inproc() const { return inproc_; }
credentials_type() const290 const std::string& credentials_type() const { return credentials_type_; }
callback_server() const291 bool callback_server() const { return callback_server_; }
292
293 std::string AsString() const;
294
Name(const::testing::TestParamInfo<TestScenario> & info)295 static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
296 return info.param.AsString();
297 }
298
299 private:
300 bool use_interceptors_;
301 bool use_proxy_;
302 bool inproc_;
303 const std::string credentials_type_;
304 bool callback_server_;
305 };
306
AsString() const307 std::string TestScenario::AsString() const {
308 std::string retval = use_interceptors_ ? "Interceptor" : "";
309 if (use_proxy_) retval += "Proxy";
310 if (inproc_) retval += "Inproc";
311 if (callback_server_) retval += "CallbackServer";
312 if (credentials_type_ == kInsecureCredentialsType) {
313 retval += "Insecure";
314 } else {
315 std::string creds_type = absl::AsciiStrToLower(credentials_type_);
316 if (!creds_type.empty()) creds_type[0] = absl::ascii_toupper(creds_type[0]);
317 retval += creds_type;
318 }
319 return retval;
320 }
321
322 class End2endTest : public ::testing::TestWithParam<TestScenario> {
323 protected:
SetUpTestSuite()324 static void SetUpTestSuite() { grpc_init(); }
TearDownTestSuite()325 static void TearDownTestSuite() { grpc_shutdown(); }
End2endTest()326 End2endTest()
327 : is_server_started_(false),
328 kMaxMessageSize_(8192),
329 special_service_("special"),
330 first_picked_port_(0) {}
331
TearDown()332 void TearDown() override {
333 if (is_server_started_) {
334 server_->Shutdown();
335 if (proxy_server_) proxy_server_->Shutdown();
336 }
337 if (first_picked_port_ > 0) {
338 grpc_recycle_unused_port(first_picked_port_);
339 }
340 }
341
StartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)342 void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
343 int port = grpc_pick_unused_port_or_die();
344 first_picked_port_ = port;
345 server_address_ << "localhost:" << port;
346 // Setup server
347 BuildAndStartServer(processor);
348 }
349
RestartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)350 void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
351 if (is_server_started_) {
352 server_->Shutdown();
353 BuildAndStartServer(processor);
354 }
355 }
356
BuildAndStartServer(const std::shared_ptr<AuthMetadataProcessor> & processor)357 void BuildAndStartServer(
358 const std::shared_ptr<AuthMetadataProcessor>& processor) {
359 ServerBuilder builder;
360 ConfigureServerBuilder(&builder);
361 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
362 GetParam().credentials_type());
363 if (GetParam().credentials_type() != kInsecureCredentialsType) {
364 server_creds->SetAuthMetadataProcessor(processor);
365 }
366 if (GetParam().use_interceptors()) {
367 std::vector<
368 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
369 creators;
370 // Add 20 phony server interceptors
371 creators.reserve(20);
372 for (auto i = 0; i < 20; i++) {
373 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
374 }
375 builder.experimental().SetInterceptorCreators(std::move(creators));
376 }
377 builder.AddListeningPort(server_address_.str(), server_creds);
378 if (!GetParam().callback_server()) {
379 builder.RegisterService(&service_);
380 } else {
381 builder.RegisterService(&callback_service_);
382 }
383 builder.RegisterService("foo.test.youtube.com", &special_service_);
384 builder.RegisterService(&dup_pkg_service_);
385
386 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
387 builder.SetSyncServerOption(
388 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
389
390 server_ = builder.BuildAndStart();
391 is_server_started_ = true;
392 }
393
ConfigureServerBuilder(ServerBuilder * builder)394 virtual void ConfigureServerBuilder(ServerBuilder* builder) {
395 builder->SetMaxMessageSize(
396 kMaxMessageSize_); // For testing max message size.
397 }
398
ResetChannel(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})399 void ResetChannel(
400 std::vector<
401 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
402 interceptor_creators = {}) {
403 if (!is_server_started_) {
404 StartServer(std::shared_ptr<AuthMetadataProcessor>());
405 }
406 EXPECT_TRUE(is_server_started_);
407 ChannelArguments args;
408 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
409 GetParam().credentials_type(), &args);
410 if (!user_agent_prefix_.empty()) {
411 args.SetUserAgentPrefix(user_agent_prefix_);
412 }
413 args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
414
415 if (!GetParam().inproc()) {
416 if (!GetParam().use_interceptors()) {
417 channel_ = grpc::CreateCustomChannel(server_address_.str(),
418 channel_creds, args);
419 } else {
420 channel_ = CreateCustomChannelWithInterceptors(
421 server_address_.str(), channel_creds, args,
422 interceptor_creators.empty() ? CreatePhonyClientInterceptors()
423 : std::move(interceptor_creators));
424 }
425 } else {
426 if (!GetParam().use_interceptors()) {
427 channel_ = server_->InProcessChannel(args);
428 } else {
429 channel_ = server_->experimental().InProcessChannelWithInterceptors(
430 args, interceptor_creators.empty()
431 ? CreatePhonyClientInterceptors()
432 : std::move(interceptor_creators));
433 }
434 }
435 }
436
ResetStub(std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators={})437 void ResetStub(
438 std::vector<
439 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
440 interceptor_creators = {}) {
441 ResetChannel(std::move(interceptor_creators));
442 if (GetParam().use_proxy()) {
443 proxy_service_ = std::make_unique<Proxy>(channel_);
444 int port = grpc_pick_unused_port_or_die();
445 std::ostringstream proxyaddr;
446 proxyaddr << "localhost:" << port;
447 ServerBuilder builder;
448 builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
449 builder.RegisterService(proxy_service_.get());
450
451 builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
452 builder.SetSyncServerOption(
453 ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
454
455 proxy_server_ = builder.BuildAndStart();
456
457 channel_ =
458 grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
459 }
460
461 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
462 PhonyInterceptor::Reset();
463 }
464
465 bool is_server_started_;
466 std::shared_ptr<Channel> channel_;
467 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
468 std::unique_ptr<Server> server_;
469 std::unique_ptr<Server> proxy_server_;
470 std::unique_ptr<Proxy> proxy_service_;
471 std::ostringstream server_address_;
472 const int kMaxMessageSize_;
473 TestServiceImpl service_;
474 CallbackTestServiceImpl callback_service_;
475 TestServiceImpl special_service_;
476 TestServiceImplDupPkg dup_pkg_service_;
477 std::string user_agent_prefix_;
478 int first_picked_port_;
479 };
480
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool with_binary_metadata)481 void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
482 bool with_binary_metadata) {
483 EchoRequest request;
484 EchoResponse response;
485 request.set_message("Hello hello hello hello");
486
487 for (int i = 0; i < num_rpcs; ++i) {
488 ClientContext context;
489 if (with_binary_metadata) {
490 char bytes[8] = {'\0', '\1', '\2', '\3',
491 '\4', '\5', '\6', static_cast<char>(i)};
492 context.AddMetadata("custom-bin", std::string(bytes, 8));
493 }
494 context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
495 Status s = stub->Echo(&context, request, &response);
496 EXPECT_EQ(response.message(), request.message());
497 EXPECT_TRUE(s.ok());
498 }
499 }
500
501 // This class is for testing scenarios where RPCs are cancelled on the server
502 // by calling ServerContext::TryCancel()
503 class End2endServerTryCancelTest : public End2endTest {
504 protected:
505 // Helper for testing client-streaming RPCs which are cancelled on the server.
506 // Depending on the value of server_try_cancel parameter, this will test one
507 // of the following three scenarios:
508 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
509 // any messages from the client
510 //
511 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
512 // messages from the client
513 //
514 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
515 // the messages from the client
516 //
517 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestRequestStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_msgs_to_send)518 void TestRequestStreamServerCancel(
519 ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
520 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
521 ResetStub();
522 EchoRequest request;
523 EchoResponse response;
524 ClientContext context;
525
526 // Send server_try_cancel value in the client metadata
527 context.AddMetadata(kServerTryCancelRequest,
528 std::to_string(server_try_cancel));
529
530 auto stream = stub_->RequestStream(&context, &response);
531
532 int num_msgs_sent = 0;
533 while (num_msgs_sent < num_msgs_to_send) {
534 request.set_message("hello");
535 if (!stream->Write(request)) {
536 break;
537 }
538 num_msgs_sent++;
539 }
540 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
541
542 stream->WritesDone();
543 Status s = stream->Finish();
544
545 // At this point, we know for sure that RPC was cancelled by the server
546 // since we passed server_try_cancel value in the metadata. Depending on the
547 // value of server_try_cancel, the RPC might have been cancelled by the
548 // server at different stages. The following validates our expectations of
549 // number of messages sent in various cancellation scenarios:
550
551 switch (server_try_cancel) {
552 case CANCEL_BEFORE_PROCESSING:
553 case CANCEL_DURING_PROCESSING:
554 // If the RPC is cancelled by server before / during messages from the
555 // client, it means that the client most likely did not get a chance to
556 // send all the messages it wanted to send. i.e num_msgs_sent <=
557 // num_msgs_to_send
558 EXPECT_LE(num_msgs_sent, num_msgs_to_send);
559 break;
560
561 case CANCEL_AFTER_PROCESSING:
562 // If the RPC was cancelled after all messages were read by the server,
563 // the client did get a chance to send all its messages
564 EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
565 break;
566
567 default:
568 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
569 server_try_cancel);
570 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
571 server_try_cancel <= CANCEL_AFTER_PROCESSING);
572 break;
573 }
574
575 EXPECT_FALSE(s.ok());
576 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
577 // Make sure that the server interceptors were notified
578 if (GetParam().use_interceptors()) {
579 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
580 }
581 }
582
583 // Helper for testing server-streaming RPCs which are cancelled on the server.
584 // Depending on the value of server_try_cancel parameter, this will test one
585 // of the following three scenarios:
586 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
587 // any messages to the client
588 //
589 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
590 // messages to the client
591 //
592 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
593 // the messages to the client
594 //
595 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestResponseStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel)596 void TestResponseStreamServerCancel(
597 ServerTryCancelRequestPhase server_try_cancel) {
598 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
599 ResetStub();
600 EchoRequest request;
601 EchoResponse response;
602 ClientContext context;
603
604 // Send server_try_cancel in the client metadata
605 context.AddMetadata(kServerTryCancelRequest,
606 std::to_string(server_try_cancel));
607
608 request.set_message("hello");
609 auto stream = stub_->ResponseStream(&context, request);
610
611 int num_msgs_read = 0;
612 while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
613 if (!stream->Read(&response)) {
614 break;
615 }
616 EXPECT_EQ(response.message(),
617 request.message() + std::to_string(num_msgs_read));
618 num_msgs_read++;
619 }
620 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
621
622 Status s = stream->Finish();
623
624 // Depending on the value of server_try_cancel, the RPC might have been
625 // cancelled by the server at different stages. The following validates our
626 // expectations of number of messages read in various cancellation
627 // scenarios:
628 switch (server_try_cancel) {
629 case CANCEL_BEFORE_PROCESSING:
630 // Server cancelled before sending any messages. Which means the client
631 // wouldn't have read any
632 EXPECT_EQ(num_msgs_read, 0);
633 break;
634
635 case CANCEL_DURING_PROCESSING:
636 // Server cancelled while writing messages. Client must have read less
637 // than or equal to the expected number of messages
638 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
639 break;
640
641 case CANCEL_AFTER_PROCESSING:
642 // Even though the Server cancelled after writing all messages, the RPC
643 // may be cancelled before the Client got a chance to read all the
644 // messages.
645 EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
646 break;
647
648 default: {
649 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
650 server_try_cancel);
651 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
652 server_try_cancel <= CANCEL_AFTER_PROCESSING);
653 break;
654 }
655 }
656
657 EXPECT_FALSE(s.ok());
658 // Make sure that the server interceptors were notified
659 if (GetParam().use_interceptors()) {
660 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
661 }
662 }
663
664 // Helper for testing bidirectional-streaming RPCs which are cancelled on the
665 // server. Depending on the value of server_try_cancel parameter, this will
666 // test one of the following three scenarios:
667 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
668 // writing any messages from/to the client
669 //
670 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
671 // writing messages from/to the client
672 //
673 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
674 // all the messages from/to the client
675 //
676 // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,int num_messages)677 void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
678 int num_messages) {
679 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
680 ResetStub();
681 EchoRequest request;
682 EchoResponse response;
683 ClientContext context;
684
685 // Send server_try_cancel in the client metadata
686 context.AddMetadata(kServerTryCancelRequest,
687 std::to_string(server_try_cancel));
688
689 auto stream = stub_->BidiStream(&context);
690
691 int num_msgs_read = 0;
692 int num_msgs_sent = 0;
693 while (num_msgs_sent < num_messages) {
694 request.set_message("hello " + std::to_string(num_msgs_sent));
695 if (!stream->Write(request)) {
696 break;
697 }
698 num_msgs_sent++;
699
700 if (!stream->Read(&response)) {
701 break;
702 }
703 num_msgs_read++;
704
705 EXPECT_EQ(response.message(), request.message());
706 }
707 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
708 gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
709
710 stream->WritesDone();
711 Status s = stream->Finish();
712
713 // Depending on the value of server_try_cancel, the RPC might have been
714 // cancelled by the server at different stages. The following validates our
715 // expectations of number of messages read in various cancellation
716 // scenarios:
717 switch (server_try_cancel) {
718 case CANCEL_BEFORE_PROCESSING:
719 EXPECT_EQ(num_msgs_read, 0);
720 break;
721
722 case CANCEL_DURING_PROCESSING:
723 EXPECT_LE(num_msgs_sent, num_messages);
724 EXPECT_LE(num_msgs_read, num_msgs_sent);
725 break;
726
727 case CANCEL_AFTER_PROCESSING:
728 EXPECT_EQ(num_msgs_sent, num_messages);
729
730 // The Server cancelled after reading the last message and after writing
731 // the message to the client. However, the RPC cancellation might have
732 // taken effect before the client actually read the response.
733 EXPECT_LE(num_msgs_read, num_msgs_sent);
734 break;
735
736 default:
737 gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
738 server_try_cancel);
739 EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
740 server_try_cancel <= CANCEL_AFTER_PROCESSING);
741 break;
742 }
743
744 EXPECT_FALSE(s.ok());
745 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
746 // Make sure that the server interceptors were notified
747 if (GetParam().use_interceptors()) {
748 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
749 }
750 }
751 };
752
TEST_P(End2endServerTryCancelTest,RequestEchoServerCancel)753 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
754 ResetStub();
755 EchoRequest request;
756 EchoResponse response;
757 ClientContext context;
758
759 context.AddMetadata(kServerTryCancelRequest,
760 std::to_string(CANCEL_BEFORE_PROCESSING));
761 Status s = stub_->Echo(&context, request, &response);
762 EXPECT_FALSE(s.ok());
763 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
764 }
765
766 // Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelBeforeReads)767 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
768 TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
769 }
770
771 // Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelDuringRead)772 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
773 TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
774 }
775
776 // Server to cancel after reading all the requests but before returning to the
777 // client
TEST_P(End2endServerTryCancelTest,RequestStreamServerCancelAfterReads)778 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
779 TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
780 }
781
782 // Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelBefore)783 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
784 TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
785 }
786
787 // Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelDuring)788 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
789 TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
790 }
791
792 // Server to cancel after writing all the respones to the stream but before
793 // returning to the client
TEST_P(End2endServerTryCancelTest,ResponseStreamServerCancelAfter)794 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
795 TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
796 }
797
798 // Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelBefore)799 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
800 TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
801 }
802
803 // Server to cancel while reading/writing requests/responses on the stream in
804 // parallel
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelDuring)805 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
806 TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
807 }
808
809 // Server to cancel after reading/writing all requests/responses on the stream
810 // but before returning to the client
TEST_P(End2endServerTryCancelTest,BidiStreamServerCancelAfter)811 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
812 TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
813 }
814
TEST_P(End2endTest,SimpleRpcWithCustomUserAgentPrefix)815 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
816 // User-Agent is an HTTP header for HTTP transports only
817 if (GetParam().inproc()) {
818 return;
819 }
820 user_agent_prefix_ = "custom_prefix";
821 ResetStub();
822 EchoRequest request;
823 EchoResponse response;
824 request.set_message("Hello hello hello hello");
825 request.mutable_param()->set_echo_metadata(true);
826
827 ClientContext context;
828 Status s = stub_->Echo(&context, request, &response);
829 EXPECT_EQ(response.message(), request.message());
830 EXPECT_TRUE(s.ok());
831 const auto& trailing_metadata = context.GetServerTrailingMetadata();
832 auto iter = trailing_metadata.find("user-agent");
833 EXPECT_TRUE(iter != trailing_metadata.end());
834 std::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
835 EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
836 }
837
TEST_P(End2endTest,MultipleRpcsWithVariedBinaryMetadataValue)838 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
839 ResetStub();
840 std::vector<std::thread> threads;
841 threads.reserve(10);
842 for (int i = 0; i < 10; ++i) {
843 threads.emplace_back(SendRpc, stub_.get(), 10, true);
844 }
845 for (int i = 0; i < 10; ++i) {
846 threads[i].join();
847 }
848 }
849
TEST_P(End2endTest,MultipleRpcs)850 TEST_P(End2endTest, MultipleRpcs) {
851 ResetStub();
852 std::vector<std::thread> threads;
853 threads.reserve(10);
854 for (int i = 0; i < 10; ++i) {
855 threads.emplace_back(SendRpc, stub_.get(), 10, false);
856 }
857 for (int i = 0; i < 10; ++i) {
858 threads[i].join();
859 }
860 }
861
TEST_P(End2endTest,ManyStubs)862 TEST_P(End2endTest, ManyStubs) {
863 ResetStub();
864 ChannelTestPeer peer(channel_.get());
865 int registered_calls_pre = peer.registered_calls();
866 for (int i = 0; i < 1000; ++i) {
867 grpc::testing::EchoTestService::NewStub(channel_);
868 }
869 EXPECT_EQ(peer.registered_calls(), registered_calls_pre);
870 }
871
TEST_P(End2endTest,EmptyBinaryMetadata)872 TEST_P(End2endTest, EmptyBinaryMetadata) {
873 ResetStub();
874 EchoRequest request;
875 EchoResponse response;
876 request.set_message("Hello hello hello hello");
877 ClientContext context;
878 context.AddMetadata("custom-bin", "");
879 Status s = stub_->Echo(&context, request, &response);
880 EXPECT_EQ(response.message(), request.message());
881 EXPECT_TRUE(s.ok());
882 }
883
TEST_P(End2endTest,AuthoritySeenOnServerSide)884 TEST_P(End2endTest, AuthoritySeenOnServerSide) {
885 ResetStub();
886 EchoRequest request;
887 request.mutable_param()->set_echo_host_from_authority_header(true);
888 EchoResponse response;
889 request.set_message("Live long and prosper.");
890 ClientContext context;
891 Status s = stub_->Echo(&context, request, &response);
892 EXPECT_EQ(response.message(), request.message());
893 if (GetParam().credentials_type() == kTlsCredentialsType) {
894 // SSL creds overrides the authority.
895 EXPECT_EQ("foo.test.google.fr", response.param().host());
896 } else if (GetParam().inproc()) {
897 EXPECT_EQ("inproc", response.param().host());
898 } else {
899 EXPECT_EQ(server_address_.str(), response.param().host());
900 }
901 EXPECT_TRUE(s.ok());
902 }
903
TEST_P(End2endTest,ReconnectChannel)904 TEST_P(End2endTest, ReconnectChannel) {
905 if (GetParam().inproc()) {
906 return;
907 }
908 int poller_slowdown_factor = 1;
909 // It needs 2 pollset_works to reconnect the channel with polling engine
910 // "poll"
911 #ifdef GRPC_POSIX_SOCKET_EV
912 if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
913 poller_slowdown_factor = 2;
914 }
915 #endif // GRPC_POSIX_SOCKET_EV
916 ResetStub();
917 SendRpc(stub_.get(), 1, false);
918 RestartServer(std::shared_ptr<AuthMetadataProcessor>());
919 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
920 // reconnect the channel. Make it a factor of 5x
921 gpr_sleep_until(
922 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
923 gpr_time_from_millis(kClientChannelBackupPollIntervalMs * 5 *
924 poller_slowdown_factor *
925 grpc_test_slowdown_factor(),
926 GPR_TIMESPAN)));
927 SendRpc(stub_.get(), 1, false);
928 }
929
TEST_P(End2endTest,RequestStreamOneRequest)930 TEST_P(End2endTest, RequestStreamOneRequest) {
931 ResetStub();
932 EchoRequest request;
933 EchoResponse response;
934 ClientContext context;
935
936 auto stream = stub_->RequestStream(&context, &response);
937 request.set_message("hello");
938 EXPECT_TRUE(stream->Write(request));
939 stream->WritesDone();
940 Status s = stream->Finish();
941 EXPECT_EQ(response.message(), request.message());
942 EXPECT_TRUE(s.ok());
943 EXPECT_TRUE(context.debug_error_string().empty());
944 }
945
TEST_P(End2endTest,RequestStreamOneRequestWithCoalescingApi)946 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
947 ResetStub();
948 EchoRequest request;
949 EchoResponse response;
950 ClientContext context;
951
952 context.set_initial_metadata_corked(true);
953 auto stream = stub_->RequestStream(&context, &response);
954 request.set_message("hello");
955 stream->WriteLast(request, WriteOptions());
956 Status s = stream->Finish();
957 EXPECT_EQ(response.message(), request.message());
958 EXPECT_TRUE(s.ok());
959 }
960
TEST_P(End2endTest,RequestStreamTwoRequests)961 TEST_P(End2endTest, RequestStreamTwoRequests) {
962 ResetStub();
963 EchoRequest request;
964 EchoResponse response;
965 ClientContext context;
966
967 auto stream = stub_->RequestStream(&context, &response);
968 request.set_message("hello");
969 EXPECT_TRUE(stream->Write(request));
970 EXPECT_TRUE(stream->Write(request));
971 stream->WritesDone();
972 Status s = stream->Finish();
973 EXPECT_EQ(response.message(), "hellohello");
974 EXPECT_TRUE(s.ok());
975 }
976
TEST_P(End2endTest,RequestStreamTwoRequestsWithWriteThrough)977 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
978 ResetStub();
979 EchoRequest request;
980 EchoResponse response;
981 ClientContext context;
982
983 auto stream = stub_->RequestStream(&context, &response);
984 request.set_message("hello");
985 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
986 EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
987 stream->WritesDone();
988 Status s = stream->Finish();
989 EXPECT_EQ(response.message(), "hellohello");
990 EXPECT_TRUE(s.ok());
991 }
992
TEST_P(End2endTest,RequestStreamTwoRequestsWithCoalescingApi)993 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
994 ResetStub();
995 EchoRequest request;
996 EchoResponse response;
997 ClientContext context;
998
999 context.set_initial_metadata_corked(true);
1000 auto stream = stub_->RequestStream(&context, &response);
1001 request.set_message("hello");
1002 EXPECT_TRUE(stream->Write(request));
1003 stream->WriteLast(request, WriteOptions());
1004 Status s = stream->Finish();
1005 EXPECT_EQ(response.message(), "hellohello");
1006 EXPECT_TRUE(s.ok());
1007 }
1008
TEST_P(End2endTest,ResponseStream)1009 TEST_P(End2endTest, ResponseStream) {
1010 ResetStub();
1011 EchoRequest request;
1012 EchoResponse response;
1013 ClientContext context;
1014 request.set_message("hello");
1015
1016 auto stream = stub_->ResponseStream(&context, request);
1017 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1018 EXPECT_TRUE(stream->Read(&response));
1019 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1020 }
1021 EXPECT_FALSE(stream->Read(&response));
1022
1023 Status s = stream->Finish();
1024 EXPECT_TRUE(s.ok());
1025 }
1026
TEST_P(End2endTest,ResponseStreamWithCoalescingApi)1027 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
1028 ResetStub();
1029 EchoRequest request;
1030 EchoResponse response;
1031 ClientContext context;
1032 request.set_message("hello");
1033 context.AddMetadata(kServerUseCoalescingApi, "1");
1034
1035 auto stream = stub_->ResponseStream(&context, request);
1036 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1037 EXPECT_TRUE(stream->Read(&response));
1038 EXPECT_EQ(response.message(), request.message() + std::to_string(i));
1039 }
1040 EXPECT_FALSE(stream->Read(&response));
1041
1042 Status s = stream->Finish();
1043 EXPECT_TRUE(s.ok());
1044 }
1045
1046 // This was added to prevent regression from issue:
1047 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,ResponseStreamWithEverythingCoalesced)1048 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
1049 ResetStub();
1050 EchoRequest request;
1051 EchoResponse response;
1052 ClientContext context;
1053 request.set_message("hello");
1054 context.AddMetadata(kServerUseCoalescingApi, "1");
1055 // We will only send one message, forcing everything (init metadata, message,
1056 // trailing) to be coalesced together.
1057 context.AddMetadata(kServerResponseStreamsToSend, "1");
1058
1059 auto stream = stub_->ResponseStream(&context, request);
1060 EXPECT_TRUE(stream->Read(&response));
1061 EXPECT_EQ(response.message(), request.message() + "0");
1062
1063 EXPECT_FALSE(stream->Read(&response));
1064
1065 Status s = stream->Finish();
1066 EXPECT_TRUE(s.ok());
1067 }
1068
TEST_P(End2endTest,BidiStream)1069 TEST_P(End2endTest, BidiStream) {
1070 ResetStub();
1071 EchoRequest request;
1072 EchoResponse response;
1073 ClientContext context;
1074 std::string msg("hello");
1075
1076 auto stream = stub_->BidiStream(&context);
1077
1078 for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
1079 request.set_message(msg + std::to_string(i));
1080 EXPECT_TRUE(stream->Write(request));
1081 EXPECT_TRUE(stream->Read(&response));
1082 EXPECT_EQ(response.message(), request.message());
1083 }
1084
1085 stream->WritesDone();
1086 EXPECT_FALSE(stream->Read(&response));
1087 EXPECT_FALSE(stream->Read(&response));
1088
1089 Status s = stream->Finish();
1090 EXPECT_TRUE(s.ok());
1091 }
1092
TEST_P(End2endTest,BidiStreamWithCoalescingApi)1093 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1094 ResetStub();
1095 EchoRequest request;
1096 EchoResponse response;
1097 ClientContext context;
1098 context.AddMetadata(kServerFinishAfterNReads, "3");
1099 context.set_initial_metadata_corked(true);
1100 std::string msg("hello");
1101
1102 auto stream = stub_->BidiStream(&context);
1103
1104 request.set_message(msg + "0");
1105 EXPECT_TRUE(stream->Write(request));
1106 EXPECT_TRUE(stream->Read(&response));
1107 EXPECT_EQ(response.message(), request.message());
1108
1109 request.set_message(msg + "1");
1110 EXPECT_TRUE(stream->Write(request));
1111 EXPECT_TRUE(stream->Read(&response));
1112 EXPECT_EQ(response.message(), request.message());
1113
1114 request.set_message(msg + "2");
1115 stream->WriteLast(request, WriteOptions());
1116 EXPECT_TRUE(stream->Read(&response));
1117 EXPECT_EQ(response.message(), request.message());
1118
1119 EXPECT_FALSE(stream->Read(&response));
1120 EXPECT_FALSE(stream->Read(&response));
1121
1122 Status s = stream->Finish();
1123 EXPECT_TRUE(s.ok());
1124 }
1125
1126 // This was added to prevent regression from issue:
1127 // https://github.com/grpc/grpc/issues/11546
TEST_P(End2endTest,BidiStreamWithEverythingCoalesced)1128 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1129 ResetStub();
1130 EchoRequest request;
1131 EchoResponse response;
1132 ClientContext context;
1133 context.AddMetadata(kServerFinishAfterNReads, "1");
1134 context.set_initial_metadata_corked(true);
1135 std::string msg("hello");
1136
1137 auto stream = stub_->BidiStream(&context);
1138
1139 request.set_message(msg + "0");
1140 stream->WriteLast(request, WriteOptions());
1141 EXPECT_TRUE(stream->Read(&response));
1142 EXPECT_EQ(response.message(), request.message());
1143
1144 EXPECT_FALSE(stream->Read(&response));
1145 EXPECT_FALSE(stream->Read(&response));
1146
1147 Status s = stream->Finish();
1148 EXPECT_TRUE(s.ok());
1149 }
1150
1151 // Talk to the two services with the same name but different package names.
1152 // The two stubs are created on the same channel.
TEST_P(End2endTest,DiffPackageServices)1153 TEST_P(End2endTest, DiffPackageServices) {
1154 ResetStub();
1155 EchoRequest request;
1156 EchoResponse response;
1157 request.set_message("Hello");
1158
1159 ClientContext context;
1160 Status s = stub_->Echo(&context, request, &response);
1161 EXPECT_EQ(response.message(), request.message());
1162 EXPECT_TRUE(s.ok());
1163
1164 std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1165 grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1166 ClientContext context2;
1167 s = dup_pkg_stub->Echo(&context2, request, &response);
1168 EXPECT_EQ("no package", response.message());
1169 EXPECT_TRUE(s.ok());
1170 }
1171
1172 template <class ServiceType>
CancelRpc(ClientContext * context,int delay_us,ServiceType * service)1173 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1174 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1175 gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1176 while (!service->signal_client()) {
1177 }
1178 context->TryCancel();
1179 }
1180
TEST_P(End2endTest,CancelRpcBeforeStart)1181 TEST_P(End2endTest, CancelRpcBeforeStart) {
1182 ResetStub();
1183 EchoRequest request;
1184 EchoResponse response;
1185 ClientContext context;
1186 request.set_message("hello");
1187 context.TryCancel();
1188 Status s = stub_->Echo(&context, request, &response);
1189 EXPECT_EQ("", response.message());
1190 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1191 if (GetParam().use_interceptors()) {
1192 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1193 }
1194 }
1195
TEST_P(End2endTest,CancelRpcAfterStart)1196 TEST_P(End2endTest, CancelRpcAfterStart) {
1197 for (int i = 0; i < 10; i++) {
1198 ResetStub();
1199 EchoRequest request;
1200 EchoResponse response;
1201 ClientContext context;
1202 request.set_message("hello");
1203 request.mutable_param()->set_server_notify_client_when_started(true);
1204 request.mutable_param()->set_skip_cancelled_check(true);
1205 Status s;
1206 std::thread echo_thread([this, &s, &context, &request, &response] {
1207 s = stub_->Echo(&context, request, &response);
1208 });
1209 if (!GetParam().callback_server()) {
1210 EXPECT_EQ(service_.ClientWaitUntilNRpcsStarted(1), 1);
1211 } else {
1212 EXPECT_EQ(callback_service_.ClientWaitUntilNRpcsStarted(1), 1);
1213 }
1214
1215 context.TryCancel();
1216
1217 if (!GetParam().callback_server()) {
1218 service_.SignalServerToContinue();
1219 } else {
1220 callback_service_.SignalServerToContinue();
1221 }
1222
1223 echo_thread.join();
1224 // TODO(ctiller): improve test to not be flaky
1225 //
1226 // TryCancel is best effort, and it can happen that the cancellation is not
1227 // acted upon before the server wakes up, sends a response, and the client
1228 // reads that.
1229 // For this reason, we try a few times here to see the cancellation result.
1230 if (s.ok()) continue;
1231 EXPECT_EQ("", response.message());
1232 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1233 if (GetParam().use_interceptors()) {
1234 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1235 }
1236 return;
1237 }
1238 GTEST_FAIL() << "Failed to get cancellation";
1239 }
1240
1241 // Client cancels request stream after sending two messages
TEST_P(End2endTest,ClientCancelsRequestStream)1242 TEST_P(End2endTest, ClientCancelsRequestStream) {
1243 ResetStub();
1244 EchoRequest request;
1245 EchoResponse response;
1246 ClientContext context;
1247 request.set_message("hello");
1248
1249 auto stream = stub_->RequestStream(&context, &response);
1250 EXPECT_TRUE(stream->Write(request));
1251 EXPECT_TRUE(stream->Write(request));
1252
1253 context.TryCancel();
1254
1255 Status s = stream->Finish();
1256 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1257
1258 EXPECT_EQ(response.message(), "");
1259 if (GetParam().use_interceptors()) {
1260 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1261 }
1262 }
1263
1264 // Client cancels server stream after sending some messages
TEST_P(End2endTest,ClientCancelsResponseStream)1265 TEST_P(End2endTest, ClientCancelsResponseStream) {
1266 ResetStub();
1267 EchoRequest request;
1268 EchoResponse response;
1269 ClientContext context;
1270 request.set_message("hello");
1271
1272 auto stream = stub_->ResponseStream(&context, request);
1273
1274 EXPECT_TRUE(stream->Read(&response));
1275 EXPECT_EQ(response.message(), request.message() + "0");
1276 EXPECT_TRUE(stream->Read(&response));
1277 EXPECT_EQ(response.message(), request.message() + "1");
1278
1279 context.TryCancel();
1280
1281 // The cancellation races with responses, so there might be zero or
1282 // one responses pending, read till failure
1283
1284 if (stream->Read(&response)) {
1285 EXPECT_EQ(response.message(), request.message() + "2");
1286 // Since we have cancelled, we expect the next attempt to read to fail
1287 EXPECT_FALSE(stream->Read(&response));
1288 }
1289
1290 Status s = stream->Finish();
1291 // The final status could be either of CANCELLED or OK depending on
1292 // who won the race.
1293 EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1294 if (GetParam().use_interceptors()) {
1295 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1296 }
1297 }
1298
1299 // Client cancels bidi stream after sending some messages
TEST_P(End2endTest,ClientCancelsBidi)1300 TEST_P(End2endTest, ClientCancelsBidi) {
1301 ResetStub();
1302 EchoRequest request;
1303 EchoResponse response;
1304 ClientContext context;
1305 std::string msg("hello");
1306
1307 // Send server_try_cancel value in the client metadata
1308 context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
1309
1310 auto stream = stub_->BidiStream(&context);
1311
1312 request.set_message(msg + "0");
1313 EXPECT_TRUE(stream->Write(request));
1314 EXPECT_TRUE(stream->Read(&response));
1315 EXPECT_EQ(response.message(), request.message());
1316
1317 request.set_message(msg + "1");
1318 EXPECT_TRUE(stream->Write(request));
1319
1320 context.TryCancel();
1321
1322 // The cancellation races with responses, so there might be zero or
1323 // one responses pending, read till failure
1324
1325 if (stream->Read(&response)) {
1326 EXPECT_EQ(response.message(), request.message());
1327 // Since we have cancelled, we expect the next attempt to read to fail
1328 EXPECT_FALSE(stream->Read(&response));
1329 }
1330
1331 Status s = stream->Finish();
1332 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1333 if (GetParam().use_interceptors()) {
1334 EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
1335 }
1336 }
1337
TEST_P(End2endTest,RpcMaxMessageSize)1338 TEST_P(End2endTest, RpcMaxMessageSize) {
1339 ResetStub();
1340 EchoRequest request;
1341 EchoResponse response;
1342 request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1343 request.mutable_param()->set_server_die(true);
1344
1345 ClientContext context;
1346 Status s = stub_->Echo(&context, request, &response);
1347 EXPECT_FALSE(s.ok());
1348 }
1349
ReaderThreadFunc(ClientReaderWriter<EchoRequest,EchoResponse> * stream,gpr_event * ev)1350 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1351 gpr_event* ev) {
1352 EchoResponse resp;
1353 gpr_event_set(ev, reinterpret_cast<void*>(1));
1354 while (stream->Read(&resp)) {
1355 gpr_log(GPR_INFO, "Read message");
1356 }
1357 }
1358
1359 // Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest,SimultaneousReadWritesDone)1360 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1361 ResetStub();
1362 ClientContext context;
1363 gpr_event ev;
1364 gpr_event_init(&ev);
1365 auto stream = stub_->BidiStream(&context);
1366 std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1367 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1368 stream->WritesDone();
1369 reader_thread.join();
1370 Status s = stream->Finish();
1371 EXPECT_TRUE(s.ok());
1372 }
1373
TEST_P(End2endTest,ChannelState)1374 TEST_P(End2endTest, ChannelState) {
1375 if (GetParam().inproc()) {
1376 return;
1377 }
1378
1379 ResetStub();
1380 // Start IDLE
1381 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1382
1383 // Did not ask to connect, no state change.
1384 CompletionQueue cq;
1385 std::chrono::system_clock::time_point deadline =
1386 std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1387 channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1388 void* tag;
1389 bool ok = true;
1390 cq.Next(&tag, &ok);
1391 EXPECT_FALSE(ok);
1392
1393 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1394 EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1395 gpr_inf_future(GPR_CLOCK_REALTIME)));
1396 auto state = channel_->GetState(false);
1397 EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1398 }
1399
1400 // Takes 10s.
TEST_P(End2endTest,ChannelStateTimeout)1401 TEST_P(End2endTest, ChannelStateTimeout) {
1402 if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1403 GetParam().inproc()) {
1404 return;
1405 }
1406 int port = grpc_pick_unused_port_or_die();
1407 std::ostringstream server_address;
1408 server_address << "localhost:" << port;
1409 // Channel to non-existing server
1410 auto channel =
1411 grpc::CreateChannel(server_address.str(), InsecureChannelCredentials());
1412 // Start IDLE
1413 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1414
1415 auto state = GRPC_CHANNEL_IDLE;
1416 for (int i = 0; i < 10; i++) {
1417 channel->WaitForStateChange(
1418 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1419 state = channel->GetState(false);
1420 }
1421 }
1422
TEST_P(End2endTest,ChannelStateOnLameChannel)1423 TEST_P(End2endTest, ChannelStateOnLameChannel) {
1424 if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
1425 GetParam().inproc()) {
1426 return;
1427 }
1428 // Channel using invalid target URI. This creates a lame channel.
1429 auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
1430 // Channel should immediately report TRANSIENT_FAILURE.
1431 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
1432 // And state will never change.
1433 auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
1434 for (int i = 0; i < 10; ++i) {
1435 channel->WaitForStateChange(
1436 state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1437 state = channel->GetState(false);
1438 }
1439 }
1440
1441 // Talking to a non-existing service.
TEST_P(End2endTest,NonExistingService)1442 TEST_P(End2endTest, NonExistingService) {
1443 ResetChannel();
1444 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1445 stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1446
1447 EchoRequest request;
1448 EchoResponse response;
1449 request.set_message("Hello");
1450
1451 ClientContext context;
1452 Status s = stub->Unimplemented(&context, request, &response);
1453 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1454 EXPECT_EQ("", s.error_message());
1455 }
1456
1457 // Ask the server to send back a serialized proto in trailer.
1458 // This is an example of setting error details.
TEST_P(End2endTest,BinaryTrailerTest)1459 TEST_P(End2endTest, BinaryTrailerTest) {
1460 ResetStub();
1461 EchoRequest request;
1462 EchoResponse response;
1463 ClientContext context;
1464
1465 request.mutable_param()->set_echo_metadata(true);
1466 DebugInfo* info = request.mutable_param()->mutable_debug_info();
1467 info->add_stack_entries("stack_entry_1");
1468 info->add_stack_entries("stack_entry_2");
1469 info->add_stack_entries("stack_entry_3");
1470 info->set_detail("detailed debug info");
1471 std::string expected_string = info->SerializeAsString();
1472 request.set_message("Hello");
1473
1474 Status s = stub_->Echo(&context, request, &response);
1475 EXPECT_FALSE(s.ok());
1476 auto trailers = context.GetServerTrailingMetadata();
1477 EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1478 auto iter = trailers.find(kDebugInfoTrailerKey);
1479 EXPECT_EQ(expected_string, iter->second);
1480 // Parse the returned trailer into a DebugInfo proto.
1481 DebugInfo returned_info;
1482 EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1483 }
1484
TEST_P(End2endTest,ExpectErrorTest)1485 TEST_P(End2endTest, ExpectErrorTest) {
1486 ResetStub();
1487
1488 std::vector<ErrorStatus> expected_status;
1489 expected_status.emplace_back();
1490 expected_status.back().set_code(13); // INTERNAL
1491 // No Error message or details
1492
1493 expected_status.emplace_back();
1494 expected_status.back().set_code(13); // INTERNAL
1495 expected_status.back().set_error_message("text error message");
1496 expected_status.back().set_binary_error_details("text error details");
1497
1498 expected_status.emplace_back();
1499 expected_status.back().set_code(13); // INTERNAL
1500 expected_status.back().set_error_message("text error message");
1501 expected_status.back().set_binary_error_details(
1502 "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1503
1504 for (auto iter = expected_status.begin(); iter != expected_status.end();
1505 ++iter) {
1506 EchoRequest request;
1507 EchoResponse response;
1508 ClientContext context;
1509 request.set_message("Hello");
1510 auto* error = request.mutable_param()->mutable_expected_error();
1511 error->set_code(iter->code());
1512 error->set_error_message(iter->error_message());
1513 error->set_binary_error_details(iter->binary_error_details());
1514
1515 Status s = stub_->Echo(&context, request, &response);
1516 EXPECT_FALSE(s.ok());
1517 EXPECT_EQ(iter->code(), s.error_code());
1518 EXPECT_EQ(iter->error_message(), s.error_message());
1519 EXPECT_EQ(iter->binary_error_details(), s.error_details());
1520 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "created"));
1521 #ifndef NDEBUG
1522 // grpc_core::StatusIntProperty::kFileLine is for debug only
1523 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "file"));
1524 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "line"));
1525 #endif
1526 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "status"));
1527 EXPECT_TRUE(absl::StrContains(context.debug_error_string(), "13"));
1528 }
1529 }
1530
1531 //////////////////////////////////////////////////////////////////////////
1532 // Test with and without a proxy.
1533 class ProxyEnd2endTest : public End2endTest {
1534 protected:
1535 };
1536
TEST_P(ProxyEnd2endTest,SimpleRpc)1537 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1538 ResetStub();
1539 SendRpc(stub_.get(), 1, false);
1540 }
1541
TEST_P(ProxyEnd2endTest,SimpleRpcWithEmptyMessages)1542 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1543 ResetStub();
1544 EchoRequest request;
1545 EchoResponse response;
1546
1547 ClientContext context;
1548 Status s = stub_->Echo(&context, request, &response);
1549 EXPECT_TRUE(s.ok());
1550 }
1551
TEST_P(ProxyEnd2endTest,MultipleRpcs)1552 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1553 ResetStub();
1554 std::vector<std::thread> threads;
1555 threads.reserve(10);
1556 for (int i = 0; i < 10; ++i) {
1557 threads.emplace_back(SendRpc, stub_.get(), 10, false);
1558 }
1559 for (int i = 0; i < 10; ++i) {
1560 threads[i].join();
1561 }
1562 }
1563
1564 // Set a 10us deadline and make sure proper error is returned.
TEST_P(ProxyEnd2endTest,RpcDeadlineExpires)1565 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1566 ResetStub();
1567 EchoRequest request;
1568 EchoResponse response;
1569 request.set_message("Hello");
1570 request.mutable_param()->set_skip_cancelled_check(true);
1571 // Let server sleep for 4 secs first to guarantee expiry.
1572 // 4 secs might seem a bit extreme but the timer manager would have been just
1573 // initialized (when ResetStub() was called) and there are some warmup costs
1574 // i.e the timer thread many not have even started. There might also be other
1575 // delays in the timer manager thread (in acquiring locks, timer data
1576 // structure manipulations, starting backup timer threads) that add to the
1577 // delays. 4 secs might be still not enough in some cases but this
1578 // significantly reduces the test flakes
1579 request.mutable_param()->set_server_sleep_us(4 * 1000 * 1000);
1580
1581 ClientContext context;
1582 std::chrono::system_clock::time_point deadline =
1583 std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1584 context.set_deadline(deadline);
1585 Status s = stub_->Echo(&context, request, &response);
1586 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1587 }
1588
1589 // Set a long but finite deadline.
TEST_P(ProxyEnd2endTest,RpcLongDeadline)1590 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1591 ResetStub();
1592 EchoRequest request;
1593 EchoResponse response;
1594 request.set_message("Hello");
1595
1596 ClientContext context;
1597 std::chrono::system_clock::time_point deadline =
1598 std::chrono::system_clock::now() + std::chrono::hours(1);
1599 context.set_deadline(deadline);
1600 Status s = stub_->Echo(&context, request, &response);
1601 EXPECT_EQ(response.message(), request.message());
1602 EXPECT_TRUE(s.ok());
1603 }
1604
1605 // Ask server to echo back the deadline it sees.
TEST_P(ProxyEnd2endTest,EchoDeadline)1606 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1607 ResetStub();
1608 EchoRequest request;
1609 EchoResponse response;
1610 request.set_message("Hello");
1611 request.mutable_param()->set_echo_deadline(true);
1612
1613 ClientContext context;
1614 std::chrono::system_clock::time_point deadline =
1615 std::chrono::system_clock::now() + std::chrono::seconds(100);
1616 context.set_deadline(deadline);
1617 Status s = stub_->Echo(&context, request, &response);
1618 EXPECT_EQ(response.message(), request.message());
1619 EXPECT_TRUE(s.ok());
1620 gpr_timespec sent_deadline;
1621 Timepoint2Timespec(deadline, &sent_deadline);
1622 // We want to allow some reasonable error given:
1623 // - request_deadline() only has 1sec resolution so the best we can do is +-1
1624 // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1625 // can end up being off by 2 in one direction.
1626 EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1627 EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1628 }
1629
1630 // Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_P(ProxyEnd2endTest,EchoDeadlineForNoDeadlineRpc)1631 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1632 ResetStub();
1633 EchoRequest request;
1634 EchoResponse response;
1635 request.set_message("Hello");
1636 request.mutable_param()->set_echo_deadline(true);
1637
1638 ClientContext context;
1639 Status s = stub_->Echo(&context, request, &response);
1640 EXPECT_EQ(response.message(), request.message());
1641 EXPECT_TRUE(s.ok());
1642 EXPECT_EQ(response.param().request_deadline(),
1643 gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1644 }
1645
TEST_P(ProxyEnd2endTest,UnimplementedRpc)1646 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1647 ResetStub();
1648 EchoRequest request;
1649 EchoResponse response;
1650 request.set_message("Hello");
1651
1652 ClientContext context;
1653 Status s = stub_->Unimplemented(&context, request, &response);
1654 EXPECT_FALSE(s.ok());
1655 EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1656 EXPECT_EQ(s.error_message(), "");
1657 EXPECT_EQ(response.message(), "");
1658 }
1659
1660 // Client cancels rpc after 10ms
TEST_P(ProxyEnd2endTest,ClientCancelsRpc)1661 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1662 ResetStub();
1663 EchoRequest request;
1664 EchoResponse response;
1665 request.set_message("Hello");
1666 const int kCancelDelayUs = 10 * 1000;
1667 request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1668
1669 ClientContext context;
1670 std::thread cancel_thread;
1671 if (!GetParam().callback_server()) {
1672 cancel_thread = std::thread(
1673 [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1674 kCancelDelayUs);
1675 // Note: the unusual pattern above (and below) is caused by a conflict
1676 // between two sets of compiler expectations. clang allows const to be
1677 // captured without mention, so there is no need to capture kCancelDelayUs
1678 // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1679 // in our tests requires an explicit capture even for const. We square this
1680 // circle by passing the const value in as an argument to the lambda.
1681 } else {
1682 cancel_thread = std::thread(
1683 [&context, this](int delay) {
1684 CancelRpc(&context, delay, &callback_service_);
1685 },
1686 kCancelDelayUs);
1687 }
1688 Status s = stub_->Echo(&context, request, &response);
1689 cancel_thread.join();
1690 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1691 EXPECT_EQ(s.error_message(), "CANCELLED");
1692 }
1693
1694 // Server cancels rpc after 1ms
TEST_P(ProxyEnd2endTest,ServerCancelsRpc)1695 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1696 ResetStub();
1697 EchoRequest request;
1698 EchoResponse response;
1699 request.set_message("Hello");
1700 request.mutable_param()->set_server_cancel_after_us(1000);
1701
1702 ClientContext context;
1703 Status s = stub_->Echo(&context, request, &response);
1704 EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1705 EXPECT_TRUE(s.error_message().empty());
1706 }
1707
1708 // Make the response larger than the flow control window.
TEST_P(ProxyEnd2endTest,HugeResponse)1709 TEST_P(ProxyEnd2endTest, HugeResponse) {
1710 ResetStub();
1711 EchoRequest request;
1712 EchoResponse response;
1713 request.set_message("huge response");
1714 const size_t kResponseSize = 1024 * (1024 + 10);
1715 request.mutable_param()->set_response_message_length(kResponseSize);
1716
1717 ClientContext context;
1718 std::chrono::system_clock::time_point deadline =
1719 std::chrono::system_clock::now() + std::chrono::seconds(20);
1720 context.set_deadline(deadline);
1721 Status s = stub_->Echo(&context, request, &response);
1722 EXPECT_EQ(kResponseSize, response.message().size());
1723 EXPECT_TRUE(s.ok());
1724 }
1725
TEST_P(ProxyEnd2endTest,Peer)1726 TEST_P(ProxyEnd2endTest, Peer) {
1727 // Peer is not meaningful for inproc
1728 if (GetParam().inproc()) {
1729 return;
1730 }
1731 ResetStub();
1732 EchoRequest request;
1733 EchoResponse response;
1734 request.set_message("hello");
1735 request.mutable_param()->set_echo_peer(true);
1736
1737 ClientContext context;
1738 Status s = stub_->Echo(&context, request, &response);
1739 EXPECT_EQ(response.message(), request.message());
1740 EXPECT_TRUE(s.ok());
1741 EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1742 EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1743 }
1744
1745 //////////////////////////////////////////////////////////////////////////
1746 class SecureEnd2endTest : public End2endTest {
1747 protected:
SecureEnd2endTest()1748 SecureEnd2endTest() {
1749 GPR_ASSERT(!GetParam().use_proxy());
1750 GPR_ASSERT(GetParam().credentials_type() != kInsecureCredentialsType);
1751 }
1752 };
1753
TEST_P(SecureEnd2endTest,SimpleRpcWithHost)1754 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1755 ResetStub();
1756
1757 EchoRequest request;
1758 EchoResponse response;
1759 request.set_message("Hello");
1760
1761 ClientContext context;
1762 context.set_authority("foo.test.youtube.com");
1763 Status s = stub_->Echo(&context, request, &response);
1764 EXPECT_EQ(response.message(), request.message());
1765 EXPECT_TRUE(response.has_param());
1766 EXPECT_EQ("special", response.param().host());
1767 EXPECT_TRUE(s.ok());
1768 }
1769
MetadataContains(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)1770 bool MetadataContains(
1771 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1772 const std::string& key, const std::string& value) {
1773 int count = 0;
1774
1775 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1776 metadata.begin();
1777 iter != metadata.end(); ++iter) {
1778 if (ToString(iter->first) == key && ToString(iter->second) == value) {
1779 count++;
1780 }
1781 }
1782 return count == 1;
1783 }
1784
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorSuccess)1785 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1786 auto* processor = new TestAuthMetadataProcessor(true);
1787 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1788 ResetStub();
1789 EchoRequest request;
1790 EchoResponse response;
1791 ClientContext context;
1792 context.set_credentials(processor->GetCompatibleClientCreds());
1793 request.set_message("Hello");
1794 request.mutable_param()->set_echo_metadata(true);
1795 request.mutable_param()->set_expected_client_identity(
1796 TestAuthMetadataProcessor::kGoodGuy);
1797 request.mutable_param()->set_expected_transport_security_type(
1798 GetParam().credentials_type());
1799
1800 Status s = stub_->Echo(&context, request, &response);
1801 EXPECT_EQ(request.message(), response.message());
1802 EXPECT_TRUE(s.ok());
1803
1804 // Metadata should have been consumed by the processor.
1805 EXPECT_FALSE(MetadataContains(
1806 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1807 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1808 }
1809
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginAndProcessorFailure)1810 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1811 auto* processor = new TestAuthMetadataProcessor(true);
1812 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1813 ResetStub();
1814 EchoRequest request;
1815 EchoResponse response;
1816 ClientContext context;
1817 context.set_credentials(processor->GetIncompatibleClientCreds());
1818 request.set_message("Hello");
1819
1820 Status s = stub_->Echo(&context, request, &response);
1821 EXPECT_FALSE(s.ok());
1822 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1823 }
1824
TEST_P(SecureEnd2endTest,SetPerCallCredentials)1825 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1826 ResetStub();
1827 EchoRequest request;
1828 EchoResponse response;
1829 ClientContext context;
1830 std::shared_ptr<CallCredentials> creds =
1831 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1832 context.set_credentials(creds);
1833 request.set_message("Hello");
1834 request.mutable_param()->set_echo_metadata(true);
1835
1836 Status s = stub_->Echo(&context, request, &response);
1837 EXPECT_EQ(request.message(), response.message());
1838 EXPECT_TRUE(s.ok());
1839 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1840 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1841 kFakeToken));
1842 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1843 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1844 kFakeSelector));
1845 EXPECT_EQ(context.credentials()->DebugString(),
1846 kExpectedFakeCredsDebugString);
1847 }
1848
1849 class CredentialsInterceptor : public experimental::Interceptor {
1850 public:
CredentialsInterceptor(experimental::ClientRpcInfo * info)1851 explicit CredentialsInterceptor(experimental::ClientRpcInfo* info)
1852 : info_(info) {}
1853
Intercept(experimental::InterceptorBatchMethods * methods)1854 void Intercept(experimental::InterceptorBatchMethods* methods) override {
1855 if (methods->QueryInterceptionHookPoint(
1856 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
1857 std::shared_ptr<CallCredentials> creds =
1858 GoogleIAMCredentials(kFakeToken, kFakeSelector);
1859 info_->client_context()->set_credentials(creds);
1860 }
1861 methods->Proceed();
1862 }
1863
1864 private:
1865 experimental::ClientRpcInfo* info_ = nullptr;
1866 };
1867
1868 class CredentialsInterceptorFactory
1869 : public experimental::ClientInterceptorFactoryInterface {
CreateClientInterceptor(experimental::ClientRpcInfo * info)1870 CredentialsInterceptor* CreateClientInterceptor(
1871 experimental::ClientRpcInfo* info) override {
1872 return new CredentialsInterceptor(info);
1873 }
1874 };
1875
TEST_P(SecureEnd2endTest,CallCredentialsInterception)1876 TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
1877 if (!GetParam().use_interceptors()) {
1878 return;
1879 }
1880 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1881 interceptor_creators;
1882 interceptor_creators.push_back(
1883 std::make_unique<CredentialsInterceptorFactory>());
1884 ResetStub(std::move(interceptor_creators));
1885 EchoRequest request;
1886 EchoResponse response;
1887 ClientContext context;
1888
1889 request.set_message("Hello");
1890 request.mutable_param()->set_echo_metadata(true);
1891
1892 Status s = stub_->Echo(&context, request, &response);
1893 EXPECT_EQ(request.message(), response.message());
1894 EXPECT_TRUE(s.ok());
1895 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1896 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1897 kFakeToken));
1898 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1899 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1900 kFakeSelector));
1901 EXPECT_EQ(context.credentials()->DebugString(),
1902 kExpectedFakeCredsDebugString);
1903 }
1904
TEST_P(SecureEnd2endTest,CallCredentialsInterceptionWithSetCredentials)1905 TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
1906 if (!GetParam().use_interceptors()) {
1907 return;
1908 }
1909 std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
1910 interceptor_creators;
1911 interceptor_creators.push_back(
1912 std::make_unique<CredentialsInterceptorFactory>());
1913 ResetStub(std::move(interceptor_creators));
1914 EchoRequest request;
1915 EchoResponse response;
1916 ClientContext context;
1917 std::shared_ptr<CallCredentials> creds1 =
1918 GoogleIAMCredentials(kWrongToken, kWrongSelector);
1919 context.set_credentials(creds1);
1920 EXPECT_EQ(context.credentials(), creds1);
1921 EXPECT_EQ(context.credentials()->DebugString(),
1922 kExpectedWrongCredsDebugString);
1923 request.set_message("Hello");
1924 request.mutable_param()->set_echo_metadata(true);
1925
1926 Status s = stub_->Echo(&context, request, &response);
1927 EXPECT_EQ(request.message(), response.message());
1928 EXPECT_TRUE(s.ok());
1929 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1930 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1931 kFakeToken));
1932 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1933 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1934 kFakeSelector));
1935 EXPECT_EQ(context.credentials()->DebugString(),
1936 kExpectedFakeCredsDebugString);
1937 }
1938
TEST_P(SecureEnd2endTest,OverridePerCallCredentials)1939 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1940 ResetStub();
1941 EchoRequest request;
1942 EchoResponse response;
1943 ClientContext context;
1944 std::shared_ptr<CallCredentials> creds1 =
1945 GoogleIAMCredentials(kFakeToken1, kFakeSelector1);
1946 context.set_credentials(creds1);
1947 EXPECT_EQ(context.credentials(), creds1);
1948 EXPECT_EQ(context.credentials()->DebugString(),
1949 kExpectedFakeCreds1DebugString);
1950 std::shared_ptr<CallCredentials> creds2 =
1951 GoogleIAMCredentials(kFakeToken2, kFakeSelector2);
1952 context.set_credentials(creds2);
1953 EXPECT_EQ(context.credentials(), creds2);
1954 request.set_message("Hello");
1955 request.mutable_param()->set_echo_metadata(true);
1956
1957 Status s = stub_->Echo(&context, request, &response);
1958 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1959 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1960 kFakeToken2));
1961 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1962 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1963 kFakeSelector2));
1964 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1965 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1966 kFakeToken1));
1967 EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1968 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1969 kFakeSelector1));
1970 EXPECT_EQ(context.credentials()->DebugString(),
1971 kExpectedFakeCreds2DebugString);
1972 EXPECT_EQ(request.message(), response.message());
1973 EXPECT_TRUE(s.ok());
1974 }
1975
TEST_P(SecureEnd2endTest,AuthMetadataPluginKeyFailure)1976 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1977 ResetStub();
1978 EchoRequest request;
1979 EchoResponse response;
1980 ClientContext context;
1981 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
1982 std::unique_ptr<MetadataCredentialsPlugin>(
1983 new TestMetadataCredentialsPlugin(
1984 TestMetadataCredentialsPlugin::kBadMetadataKey,
1985 "Does not matter, will fail the key is invalid.", false, true,
1986 0))));
1987 request.set_message("Hello");
1988
1989 Status s = stub_->Echo(&context, request, &response);
1990 EXPECT_FALSE(s.ok());
1991 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1992 EXPECT_EQ(context.credentials()->DebugString(),
1993 kExpectedAuthMetadataPluginKeyFailureCredsDebugString);
1994 }
1995
TEST_P(SecureEnd2endTest,AuthMetadataPluginValueFailure)1996 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1997 ResetStub();
1998 EchoRequest request;
1999 EchoResponse response;
2000 ClientContext context;
2001 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2002 std::unique_ptr<MetadataCredentialsPlugin>(
2003 new TestMetadataCredentialsPlugin(
2004 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2005 "With illegal \n value.", false, true, 0))));
2006 request.set_message("Hello");
2007
2008 Status s = stub_->Echo(&context, request, &response);
2009 EXPECT_FALSE(s.ok());
2010 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2011 EXPECT_EQ(context.credentials()->DebugString(),
2012 kExpectedAuthMetadataPluginValueFailureCredsDebugString);
2013 }
2014
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithDeadline)2015 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) {
2016 ResetStub();
2017 EchoRequest request;
2018 request.mutable_param()->set_skip_cancelled_check(true);
2019 EchoResponse response;
2020 ClientContext context;
2021 const int delay = 100;
2022 std::chrono::system_clock::time_point deadline =
2023 std::chrono::system_clock::now() + std::chrono::milliseconds(delay);
2024 context.set_deadline(deadline);
2025 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2026 std::unique_ptr<MetadataCredentialsPlugin>(
2027 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2028 true, delay))));
2029 request.set_message("Hello");
2030
2031 Status s = stub_->Echo(&context, request, &response);
2032 if (!s.ok()) {
2033 EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED ||
2034 s.error_code() == StatusCode::UNAVAILABLE);
2035 }
2036 EXPECT_EQ(context.credentials()->DebugString(),
2037 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2038 }
2039
TEST_P(SecureEnd2endTest,AuthMetadataPluginWithCancel)2040 TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) {
2041 ResetStub();
2042 EchoRequest request;
2043 request.mutable_param()->set_skip_cancelled_check(true);
2044 EchoResponse response;
2045 ClientContext context;
2046 const int delay = 100;
2047 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2048 std::unique_ptr<MetadataCredentialsPlugin>(
2049 new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true,
2050 true, delay))));
2051 request.set_message("Hello");
2052
2053 std::thread cancel_thread([&] {
2054 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
2055 gpr_time_from_millis(delay, GPR_TIMESPAN)));
2056 context.TryCancel();
2057 });
2058 Status s = stub_->Echo(&context, request, &response);
2059 if (!s.ok()) {
2060 EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED ||
2061 s.error_code() == StatusCode::UNAVAILABLE);
2062 }
2063 cancel_thread.join();
2064 EXPECT_EQ(context.credentials()->DebugString(),
2065 kExpectedAuthMetadataPluginWithDeadlineCredsDebugString);
2066 }
2067
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginFailure)2068 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
2069 ResetStub();
2070 EchoRequest request;
2071 EchoResponse response;
2072 ClientContext context;
2073 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2074 std::unique_ptr<MetadataCredentialsPlugin>(
2075 new TestMetadataCredentialsPlugin(
2076 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2077 "Does not matter, will fail anyway (see 3rd param)", false, false,
2078 0))));
2079 request.set_message("Hello");
2080
2081 Status s = stub_->Echo(&context, request, &response);
2082 EXPECT_FALSE(s.ok());
2083 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2084 EXPECT_EQ(s.error_message(),
2085 std::string("Getting metadata from plugin failed with error: ") +
2086 kTestCredsPluginErrorMsg);
2087 EXPECT_EQ(context.credentials()->DebugString(),
2088 kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString);
2089 }
2090
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorSuccess)2091 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
2092 auto* processor = new TestAuthMetadataProcessor(false);
2093 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2094 ResetStub();
2095 EchoRequest request;
2096 EchoResponse response;
2097 ClientContext context;
2098 context.set_credentials(processor->GetCompatibleClientCreds());
2099 request.set_message("Hello");
2100 request.mutable_param()->set_echo_metadata(true);
2101 request.mutable_param()->set_expected_client_identity(
2102 TestAuthMetadataProcessor::kGoodGuy);
2103 request.mutable_param()->set_expected_transport_security_type(
2104 GetParam().credentials_type());
2105
2106 Status s = stub_->Echo(&context, request, &response);
2107 EXPECT_EQ(request.message(), response.message());
2108 EXPECT_TRUE(s.ok());
2109
2110 // Metadata should have been consumed by the processor.
2111 EXPECT_FALSE(MetadataContains(
2112 context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
2113 std::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
2114 EXPECT_EQ(
2115 context.credentials()->DebugString(),
2116 kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString);
2117 }
2118
TEST_P(SecureEnd2endTest,NonBlockingAuthMetadataPluginAndProcessorFailure)2119 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
2120 auto* processor = new TestAuthMetadataProcessor(false);
2121 StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
2122 ResetStub();
2123 EchoRequest request;
2124 EchoResponse response;
2125 ClientContext context;
2126 context.set_credentials(processor->GetIncompatibleClientCreds());
2127 request.set_message("Hello");
2128
2129 Status s = stub_->Echo(&context, request, &response);
2130 EXPECT_FALSE(s.ok());
2131 EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
2132 EXPECT_EQ(
2133 context.credentials()->DebugString(),
2134 kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString);
2135 }
2136
TEST_P(SecureEnd2endTest,BlockingAuthMetadataPluginFailure)2137 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
2138 ResetStub();
2139 EchoRequest request;
2140 EchoResponse response;
2141 ClientContext context;
2142 context.set_credentials(grpc::MetadataCredentialsFromPlugin(
2143 std::unique_ptr<MetadataCredentialsPlugin>(
2144 new TestMetadataCredentialsPlugin(
2145 TestMetadataCredentialsPlugin::kGoodMetadataKey,
2146 "Does not matter, will fail anyway (see 3rd param)", true, false,
2147 0))));
2148 request.set_message("Hello");
2149
2150 Status s = stub_->Echo(&context, request, &response);
2151 EXPECT_FALSE(s.ok());
2152 EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
2153 EXPECT_EQ(s.error_message(),
2154 std::string("Getting metadata from plugin failed with error: ") +
2155 kTestCredsPluginErrorMsg);
2156 EXPECT_EQ(context.credentials()->DebugString(),
2157 kExpectedBlockingAuthMetadataPluginFailureCredsDebugString);
2158 }
2159
TEST_P(SecureEnd2endTest,CompositeCallCreds)2160 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
2161 ResetStub();
2162 EchoRequest request;
2163 EchoResponse response;
2164 ClientContext context;
2165 const char kMetadataKey1[] = "call-creds-key1";
2166 const char kMetadataKey2[] = "call-creds-key2";
2167 const char kMetadataVal1[] = "call-creds-val1";
2168 const char kMetadataVal2[] = "call-creds-val2";
2169
2170 context.set_credentials(grpc::CompositeCallCredentials(
2171 grpc::MetadataCredentialsFromPlugin(
2172 std::unique_ptr<MetadataCredentialsPlugin>(
2173 new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1,
2174 true, true, 0))),
2175 grpc::MetadataCredentialsFromPlugin(
2176 std::unique_ptr<MetadataCredentialsPlugin>(
2177 new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2,
2178 true, true, 0)))));
2179 request.set_message("Hello");
2180 request.mutable_param()->set_echo_metadata(true);
2181
2182 Status s = stub_->Echo(&context, request, &response);
2183 EXPECT_TRUE(s.ok());
2184 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2185 kMetadataKey1, kMetadataVal1));
2186 EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
2187 kMetadataKey2, kMetadataVal2));
2188 EXPECT_EQ(context.credentials()->DebugString(),
2189 kExpectedCompositeCallCredsDebugString);
2190 }
2191
TEST_P(SecureEnd2endTest,ClientAuthContext)2192 TEST_P(SecureEnd2endTest, ClientAuthContext) {
2193 ResetStub();
2194 EchoRequest request;
2195 EchoResponse response;
2196 request.set_message("Hello");
2197 request.mutable_param()->set_check_auth_context(
2198 GetParam().credentials_type() == kTlsCredentialsType);
2199 request.mutable_param()->set_expected_transport_security_type(
2200 GetParam().credentials_type());
2201 ClientContext context;
2202 Status s = stub_->Echo(&context, request, &response);
2203 EXPECT_EQ(response.message(), request.message());
2204 EXPECT_TRUE(s.ok());
2205
2206 std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
2207 std::vector<grpc::string_ref> tst =
2208 auth_ctx->FindPropertyValues("transport_security_type");
2209 ASSERT_EQ(1u, tst.size());
2210 EXPECT_EQ(GetParam().credentials_type(), ToString(tst[0]));
2211 if (GetParam().credentials_type() == kTlsCredentialsType) {
2212 EXPECT_EQ("x509_subject_alternative_name",
2213 auth_ctx->GetPeerIdentityPropertyName());
2214 EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
2215 EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
2216 EXPECT_EQ("waterzooi.test.google.be",
2217 ToString(auth_ctx->GetPeerIdentity()[1]));
2218 EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2219 EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2220 }
2221 }
2222
2223 class ResourceQuotaEnd2endTest : public End2endTest {
2224 public:
ResourceQuotaEnd2endTest()2225 ResourceQuotaEnd2endTest()
2226 : server_resource_quota_("server_resource_quota") {}
2227
ConfigureServerBuilder(ServerBuilder * builder)2228 void ConfigureServerBuilder(ServerBuilder* builder) override {
2229 builder->SetResourceQuota(server_resource_quota_);
2230 }
2231
2232 private:
2233 ResourceQuota server_resource_quota_;
2234 };
2235
TEST_P(ResourceQuotaEnd2endTest,SimpleRequest)2236 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2237 ResetStub();
2238
2239 EchoRequest request;
2240 EchoResponse response;
2241 request.set_message("Hello");
2242
2243 ClientContext context;
2244 Status s = stub_->Echo(&context, request, &response);
2245 EXPECT_EQ(response.message(), request.message());
2246 EXPECT_TRUE(s.ok());
2247 }
2248
2249 // TODO(vjpai): refactor arguments into a struct if it makes sense
CreateTestScenarios(bool use_proxy,bool test_insecure,bool test_secure,bool test_inproc,bool test_callback_server)2250 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2251 bool test_insecure,
2252 bool test_secure,
2253 bool test_inproc,
2254 bool test_callback_server) {
2255 std::vector<TestScenario> scenarios;
2256 std::vector<std::string> credentials_types;
2257
2258 grpc_core::ConfigVars::Overrides overrides;
2259 overrides.client_channel_backup_poll_interval_ms =
2260 kClientChannelBackupPollIntervalMs;
2261 grpc_core::ConfigVars::SetOverrides(overrides);
2262 #if TARGET_OS_IPHONE
2263 // Workaround Apple CFStream bug
2264 grpc_core::SetEnv("grpc_cfstream", "0");
2265 #endif
2266
2267 if (test_secure) {
2268 credentials_types =
2269 GetCredentialsProvider()->GetSecureCredentialsTypeList();
2270 }
2271 auto insec_ok = [] {
2272 // Only allow insecure credentials type when it is registered with the
2273 // provider. User may create providers that do not have insecure.
2274 return GetCredentialsProvider()->GetChannelCredentials(
2275 kInsecureCredentialsType, nullptr) != nullptr;
2276 };
2277 if (test_insecure && insec_ok()) {
2278 credentials_types.push_back(kInsecureCredentialsType);
2279 }
2280
2281 // Test callback with inproc or if the event-engine allows it
2282 GPR_ASSERT(!credentials_types.empty());
2283 for (const auto& cred : credentials_types) {
2284 scenarios.emplace_back(false, false, false, cred, false);
2285 scenarios.emplace_back(true, false, false, cred, false);
2286 if (test_callback_server) {
2287 // Note that these scenarios will be dynamically disabled if the event
2288 // engine doesn't run in the background
2289 scenarios.emplace_back(false, false, false, cred, true);
2290 scenarios.emplace_back(true, false, false, cred, true);
2291 }
2292 if (use_proxy) {
2293 scenarios.emplace_back(false, true, false, cred, false);
2294 scenarios.emplace_back(true, true, false, cred, false);
2295 }
2296 }
2297 if (test_inproc && insec_ok()) {
2298 scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2299 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2300 if (test_callback_server) {
2301 scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2302 true);
2303 scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2304 }
2305 }
2306 return scenarios;
2307 }
2308
2309 INSTANTIATE_TEST_SUITE_P(
2310 End2end, End2endTest,
2311 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2312 &TestScenario::Name);
2313
2314 INSTANTIATE_TEST_SUITE_P(
2315 End2endServerTryCancel, End2endServerTryCancelTest,
2316 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2317 &TestScenario::Name);
2318
2319 INSTANTIATE_TEST_SUITE_P(
2320 ProxyEnd2end, ProxyEnd2endTest,
2321 ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)),
2322 &TestScenario::Name);
2323
2324 INSTANTIATE_TEST_SUITE_P(
2325 SecureEnd2end, SecureEnd2endTest,
2326 ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)),
2327 &TestScenario::Name);
2328
2329 INSTANTIATE_TEST_SUITE_P(
2330 ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2331 ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
2332 &TestScenario::Name);
2333
2334 } // namespace
2335 } // namespace testing
2336 } // namespace grpc
2337
main(int argc,char ** argv)2338 int main(int argc, char** argv) {
2339 grpc::testing::TestEnvironment env(&argc, argv);
2340 ::testing::InitGoogleTest(&argc, argv);
2341 int ret = RUN_ALL_TESTS();
2342 return ret;
2343 }
2344