1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <memory>
20 #include <vector>
21
22 #include <gtest/gtest.h>
23
24 #include "absl/memory/memory.h"
25 #include "absl/strings/match.h"
26
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/generic/generic_stub.h>
31 #include <grpcpp/impl/proto_utils.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/server_interceptor.h>
36
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40 #include "test/cpp/end2end/interceptors_util.h"
41 #include "test/cpp/end2end/test_service_impl.h"
42 #include "test/cpp/util/byte_buffer_proto_helper.h"
43
44 namespace grpc {
45 namespace testing {
46 namespace {
47
48 class LoggingInterceptor : public experimental::Interceptor {
49 public:
LoggingInterceptor(experimental::ServerRpcInfo * info)50 explicit LoggingInterceptor(experimental::ServerRpcInfo* info) {
51 // Check the method name and compare to the type
52 const char* method = info->method();
53 experimental::ServerRpcInfo::Type type = info->type();
54
55 // Check that we use one of our standard methods with expected type.
56 // Also allow the health checking service.
57 // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
58 // being tested (the GenericRpc test).
59 // The empty method is for the Unimplemented requests that arise
60 // when draining the CQ.
61 EXPECT_TRUE(
62 strstr(method, "/grpc.health") == method ||
63 (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
64 (type == experimental::ServerRpcInfo::Type::UNARY ||
65 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
66 (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
67 type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
68 (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
69 type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
70 (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
71 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
72 strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
73 (strcmp(method, "") == 0 &&
74 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
75 }
76
Intercept(experimental::InterceptorBatchMethods * methods)77 void Intercept(experimental::InterceptorBatchMethods* methods) override {
78 if (methods->QueryInterceptionHookPoint(
79 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
80 auto* map = methods->GetSendInitialMetadata();
81 // Got nothing better to do here for now
82 EXPECT_EQ(map->size(), 0);
83 }
84 if (methods->QueryInterceptionHookPoint(
85 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
86 EchoRequest req;
87 auto* buffer = methods->GetSerializedSendMessage();
88 auto copied_buffer = *buffer;
89 EXPECT_TRUE(
90 SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
91 .ok());
92 EXPECT_TRUE(req.message().find("Hello") == 0);
93 }
94 if (methods->QueryInterceptionHookPoint(
95 experimental::InterceptionHookPoints::PRE_SEND_STATUS)) {
96 auto* map = methods->GetSendTrailingMetadata();
97 bool found = false;
98 // Check that we received the metadata as an echo
99 for (const auto& pair : *map) {
100 found = absl::StartsWith(pair.first, "testkey") &&
101 absl::StartsWith(pair.second, "testvalue");
102 if (found) break;
103 }
104 EXPECT_EQ(found, true);
105 auto status = methods->GetSendStatus();
106 EXPECT_EQ(status.ok(), true);
107 }
108 if (methods->QueryInterceptionHookPoint(
109 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
110 auto* map = methods->GetRecvInitialMetadata();
111 bool found = false;
112 // Check that we received the metadata as an echo
113 for (const auto& pair : *map) {
114 found = pair.first.find("testkey") == 0 &&
115 pair.second.find("testvalue") == 0;
116 if (found) break;
117 }
118 EXPECT_EQ(found, true);
119 }
120 if (methods->QueryInterceptionHookPoint(
121 experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
122 EchoResponse* resp =
123 static_cast<EchoResponse*>(methods->GetRecvMessage());
124 if (resp != nullptr) {
125 EXPECT_TRUE(resp->message().find("Hello") == 0);
126 }
127 }
128 if (methods->QueryInterceptionHookPoint(
129 experimental::InterceptionHookPoints::POST_RECV_CLOSE)) {
130 // Got nothing interesting to do here
131 }
132 methods->Proceed();
133 }
134 };
135
136 class LoggingInterceptorFactory
137 : public experimental::ServerInterceptorFactoryInterface {
138 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)139 experimental::Interceptor* CreateServerInterceptor(
140 experimental::ServerRpcInfo* info) override {
141 return new LoggingInterceptor(info);
142 }
143 };
144
145 // Test if SendMessage function family works as expected for sync/callback apis
146 class SyncSendMessageTester : public experimental::Interceptor {
147 public:
SyncSendMessageTester(experimental::ServerRpcInfo *)148 explicit SyncSendMessageTester(experimental::ServerRpcInfo* /*info*/) {}
149
Intercept(experimental::InterceptorBatchMethods * methods)150 void Intercept(experimental::InterceptorBatchMethods* methods) override {
151 if (methods->QueryInterceptionHookPoint(
152 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
153 string old_msg =
154 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
155 EXPECT_EQ(old_msg.find("Hello"), 0u);
156 new_msg_.set_message("World" + old_msg);
157 methods->ModifySendMessage(&new_msg_);
158 }
159 methods->Proceed();
160 }
161
162 private:
163 EchoRequest new_msg_;
164 };
165
166 class SyncSendMessageTesterFactory
167 : public experimental::ServerInterceptorFactoryInterface {
168 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)169 experimental::Interceptor* CreateServerInterceptor(
170 experimental::ServerRpcInfo* info) override {
171 return new SyncSendMessageTester(info);
172 }
173 };
174
175 // Test if SendMessage function family works as expected for sync/callback apis
176 class SyncSendMessageVerifier : public experimental::Interceptor {
177 public:
SyncSendMessageVerifier(experimental::ServerRpcInfo *)178 explicit SyncSendMessageVerifier(experimental::ServerRpcInfo* /*info*/) {}
179
Intercept(experimental::InterceptorBatchMethods * methods)180 void Intercept(experimental::InterceptorBatchMethods* methods) override {
181 if (methods->QueryInterceptionHookPoint(
182 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
183 // Make sure that the changes made in SyncSendMessageTester persisted
184 string old_msg =
185 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
186 EXPECT_EQ(old_msg.find("World"), 0u);
187
188 // Remove the "World" part of the string that we added earlier
189 new_msg_.set_message(old_msg.erase(0, 5));
190 methods->ModifySendMessage(&new_msg_);
191
192 // LoggingInterceptor verifies that changes got reverted
193 }
194 methods->Proceed();
195 }
196
197 private:
198 EchoRequest new_msg_;
199 };
200
201 class SyncSendMessageVerifierFactory
202 : public experimental::ServerInterceptorFactoryInterface {
203 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)204 experimental::Interceptor* CreateServerInterceptor(
205 experimental::ServerRpcInfo* info) override {
206 return new SyncSendMessageVerifier(info);
207 }
208 };
209
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)210 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
211 auto stub = grpc::testing::EchoTestService::NewStub(channel);
212 ClientContext ctx;
213 EchoRequest req;
214 EchoResponse resp;
215 ctx.AddMetadata("testkey", "testvalue");
216 auto stream = stub->BidiStream(&ctx);
217 for (auto i = 0; i < 10; i++) {
218 req.set_message("Hello" + std::to_string(i));
219 stream->Write(req);
220 stream->Read(&resp);
221 EXPECT_EQ(req.message(), resp.message());
222 }
223 ASSERT_TRUE(stream->WritesDone());
224 Status s = stream->Finish();
225 EXPECT_EQ(s.ok(), true);
226 }
227
228 class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
229 protected:
ServerInterceptorsEnd2endSyncUnaryTest()230 ServerInterceptorsEnd2endSyncUnaryTest() {
231 int port = grpc_pick_unused_port_or_die();
232
233 ServerBuilder builder;
234 server_address_ = "localhost:" + std::to_string(port);
235 builder.AddListeningPort(server_address_, InsecureServerCredentials());
236 builder.RegisterService(&service_);
237
238 std::vector<
239 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
240 creators;
241 creators.push_back(
242 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
243 new SyncSendMessageTesterFactory()));
244 creators.push_back(
245 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
246 new SyncSendMessageVerifierFactory()));
247 creators.push_back(
248 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
249 new LoggingInterceptorFactory()));
250 // Add 20 phony interceptor factories and null interceptor factories
251 for (auto i = 0; i < 20; i++) {
252 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
253 creators.push_back(std::make_unique<NullInterceptorFactory>());
254 }
255 builder.experimental().SetInterceptorCreators(std::move(creators));
256 server_ = builder.BuildAndStart();
257 }
258 std::string server_address_;
259 TestServiceImpl service_;
260 std::unique_ptr<Server> server_;
261 };
262
TEST_F(ServerInterceptorsEnd2endSyncUnaryTest,UnaryTest)263 TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
264 ChannelArguments args;
265 PhonyInterceptor::Reset();
266 auto channel =
267 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
268 MakeCall(channel);
269 // Make sure all 20 phony interceptors were run
270 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
271 }
272
273 class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
274 protected:
ServerInterceptorsEnd2endSyncStreamingTest()275 ServerInterceptorsEnd2endSyncStreamingTest() {
276 int port = grpc_pick_unused_port_or_die();
277
278 ServerBuilder builder;
279 server_address_ = "localhost:" + std::to_string(port);
280 builder.AddListeningPort(server_address_, InsecureServerCredentials());
281 builder.RegisterService(&service_);
282
283 std::vector<
284 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
285 creators;
286 creators.push_back(
287 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
288 new SyncSendMessageTesterFactory()));
289 creators.push_back(
290 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
291 new SyncSendMessageVerifierFactory()));
292 creators.push_back(
293 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
294 new LoggingInterceptorFactory()));
295 for (auto i = 0; i < 20; i++) {
296 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
297 }
298 builder.experimental().SetInterceptorCreators(std::move(creators));
299 server_ = builder.BuildAndStart();
300 }
301 std::string server_address_;
302 EchoTestServiceStreamingImpl service_;
303 std::unique_ptr<Server> server_;
304 };
305
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ClientStreamingTest)306 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
307 ChannelArguments args;
308 PhonyInterceptor::Reset();
309 auto channel =
310 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
311 MakeClientStreamingCall(channel);
312 // Make sure all 20 phony interceptors were run
313 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
314 }
315
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ServerStreamingTest)316 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
317 ChannelArguments args;
318 PhonyInterceptor::Reset();
319 auto channel =
320 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
321 MakeServerStreamingCall(channel);
322 // Make sure all 20 phony interceptors were run
323 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
324 }
325
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,BidiStreamingTest)326 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
327 ChannelArguments args;
328 PhonyInterceptor::Reset();
329 auto channel =
330 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
331 MakeBidiStreamingCall(channel);
332 // Make sure all 20 phony interceptors were run
333 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
334 }
335
336 class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {};
337
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnaryTest)338 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) {
339 PhonyInterceptor::Reset();
340 int port = grpc_pick_unused_port_or_die();
341 string server_address = "localhost:" + std::to_string(port);
342 ServerBuilder builder;
343 EchoTestService::AsyncService service;
344 builder.AddListeningPort(server_address, InsecureServerCredentials());
345 builder.RegisterService(&service);
346 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
347 creators;
348 creators.push_back(
349 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
350 new LoggingInterceptorFactory()));
351 for (auto i = 0; i < 20; i++) {
352 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
353 }
354 builder.experimental().SetInterceptorCreators(std::move(creators));
355 auto cq = builder.AddCompletionQueue();
356 auto server = builder.BuildAndStart();
357
358 ChannelArguments args;
359 auto channel =
360 grpc::CreateChannel(server_address, InsecureChannelCredentials());
361 auto stub = grpc::testing::EchoTestService::NewStub(channel);
362
363 EchoRequest send_request;
364 EchoRequest recv_request;
365 EchoResponse send_response;
366 EchoResponse recv_response;
367 Status recv_status;
368
369 ClientContext cli_ctx;
370 ServerContext srv_ctx;
371 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
372
373 send_request.set_message("Hello");
374 cli_ctx.AddMetadata("testkey", "testvalue");
375 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
376 stub->AsyncEcho(&cli_ctx, send_request, cq.get()));
377
378 service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(),
379 cq.get(), tag(2));
380
381 response_reader->Finish(&recv_response, &recv_status, tag(4));
382
383 Verifier().Expect(2, true).Verify(cq.get());
384 EXPECT_EQ(send_request.message(), recv_request.message());
385
386 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
387 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
388
389 send_response.set_message(recv_request.message());
390 response_writer.Finish(send_response, Status::OK, tag(3));
391 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
392
393 EXPECT_EQ(send_response.message(), recv_response.message());
394 EXPECT_TRUE(recv_status.ok());
395 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
396 "testvalue"));
397
398 // Make sure all 20 phony interceptors were run
399 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
400
401 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
402 cq->Shutdown();
403 void* ignored_tag;
404 bool ignored_ok;
405 while (cq->Next(&ignored_tag, &ignored_ok)) {
406 }
407 grpc_recycle_unused_port(port);
408 }
409
TEST_F(ServerInterceptorsAsyncEnd2endTest,BidiStreamingTest)410 TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
411 PhonyInterceptor::Reset();
412 int port = grpc_pick_unused_port_or_die();
413 string server_address = "localhost:" + std::to_string(port);
414 ServerBuilder builder;
415 EchoTestService::AsyncService service;
416 builder.AddListeningPort(server_address, InsecureServerCredentials());
417 builder.RegisterService(&service);
418 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
419 creators;
420 creators.push_back(
421 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
422 new LoggingInterceptorFactory()));
423 for (auto i = 0; i < 20; i++) {
424 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
425 }
426 builder.experimental().SetInterceptorCreators(std::move(creators));
427 auto cq = builder.AddCompletionQueue();
428 auto server = builder.BuildAndStart();
429
430 ChannelArguments args;
431 auto channel =
432 grpc::CreateChannel(server_address, InsecureChannelCredentials());
433 auto stub = grpc::testing::EchoTestService::NewStub(channel);
434
435 EchoRequest send_request;
436 EchoRequest recv_request;
437 EchoResponse send_response;
438 EchoResponse recv_response;
439 Status recv_status;
440
441 ClientContext cli_ctx;
442 ServerContext srv_ctx;
443 grpc::ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
444
445 send_request.set_message("Hello");
446 cli_ctx.AddMetadata("testkey", "testvalue");
447 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
448 cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1)));
449
450 service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2));
451
452 Verifier().Expect(1, true).Expect(2, true).Verify(cq.get());
453
454 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
455 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
456
457 cli_stream->Write(send_request, tag(3));
458 srv_stream.Read(&recv_request, tag(4));
459 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
460 EXPECT_EQ(send_request.message(), recv_request.message());
461
462 send_response.set_message(recv_request.message());
463 srv_stream.Write(send_response, tag(5));
464 cli_stream->Read(&recv_response, tag(6));
465 Verifier().Expect(5, true).Expect(6, true).Verify(cq.get());
466 EXPECT_EQ(send_response.message(), recv_response.message());
467
468 cli_stream->WritesDone(tag(7));
469 srv_stream.Read(&recv_request, tag(8));
470 Verifier().Expect(7, true).Expect(8, false).Verify(cq.get());
471
472 srv_stream.Finish(Status::OK, tag(9));
473 cli_stream->Finish(&recv_status, tag(10));
474 Verifier().Expect(9, true).Expect(10, true).Verify(cq.get());
475
476 EXPECT_TRUE(recv_status.ok());
477 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
478 "testvalue"));
479
480 // Make sure all 20 phony interceptors were run
481 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
482
483 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
484 cq->Shutdown();
485 void* ignored_tag;
486 bool ignored_ok;
487 while (cq->Next(&ignored_tag, &ignored_ok)) {
488 }
489 grpc_recycle_unused_port(port);
490 }
491
TEST_F(ServerInterceptorsAsyncEnd2endTest,GenericRPCTest)492 TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
493 PhonyInterceptor::Reset();
494 int port = grpc_pick_unused_port_or_die();
495 string server_address = "localhost:" + std::to_string(port);
496 ServerBuilder builder;
497 AsyncGenericService service;
498 builder.AddListeningPort(server_address, InsecureServerCredentials());
499 builder.RegisterAsyncGenericService(&service);
500 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
501 creators;
502 creators.reserve(20);
503 for (auto i = 0; i < 20; i++) {
504 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
505 }
506 builder.experimental().SetInterceptorCreators(std::move(creators));
507 auto srv_cq = builder.AddCompletionQueue();
508 CompletionQueue cli_cq;
509 auto server = builder.BuildAndStart();
510
511 ChannelArguments args;
512 auto channel =
513 grpc::CreateChannel(server_address, InsecureChannelCredentials());
514 GenericStub generic_stub(channel);
515
516 const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
517 EchoRequest send_request;
518 EchoRequest recv_request;
519 EchoResponse send_response;
520 EchoResponse recv_response;
521 Status recv_status;
522
523 ClientContext cli_ctx;
524 GenericServerContext srv_ctx;
525 GenericServerAsyncReaderWriter stream(&srv_ctx);
526
527 // The string needs to be long enough to test heap-based slice.
528 send_request.set_message("Hello");
529 cli_ctx.AddMetadata("testkey", "testvalue");
530
531 CompletionQueue* cq = srv_cq.get();
532 std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
533 std::unique_ptr<GenericClientAsyncReaderWriter> call =
534 generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
535 call->StartCall(tag(1));
536 Verifier().Expect(1, true).Verify(&cli_cq);
537 std::unique_ptr<ByteBuffer> send_buffer =
538 SerializeToByteBuffer(&send_request);
539 call->Write(*send_buffer, tag(2));
540 // Send ByteBuffer can be destroyed after calling Write.
541 send_buffer.reset();
542 Verifier().Expect(2, true).Verify(&cli_cq);
543 call->WritesDone(tag(3));
544 Verifier().Expect(3, true).Verify(&cli_cq);
545
546 service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
547
548 request_call.join();
549 EXPECT_EQ(kMethodName, srv_ctx.method());
550 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
551 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
552
553 ByteBuffer recv_buffer;
554 stream.Read(&recv_buffer, tag(5));
555 Verifier().Expect(5, true).Verify(srv_cq.get());
556 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
557 EXPECT_EQ(send_request.message(), recv_request.message());
558
559 send_response.set_message(recv_request.message());
560 send_buffer = SerializeToByteBuffer(&send_response);
561 stream.Write(*send_buffer, tag(6));
562 send_buffer.reset();
563 Verifier().Expect(6, true).Verify(srv_cq.get());
564
565 stream.Finish(Status::OK, tag(7));
566 // Shutdown srv_cq before we try to get the tag back, to verify that the
567 // interception API handles completion queue shutdowns that take place before
568 // all the tags are returned
569 srv_cq->Shutdown();
570 Verifier().Expect(7, true).Verify(srv_cq.get());
571
572 recv_buffer.Clear();
573 call->Read(&recv_buffer, tag(8));
574 Verifier().Expect(8, true).Verify(&cli_cq);
575 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
576
577 call->Finish(&recv_status, tag(9));
578 cli_cq.Shutdown();
579 Verifier().Expect(9, true).Verify(&cli_cq);
580
581 EXPECT_EQ(send_response.message(), recv_response.message());
582 EXPECT_TRUE(recv_status.ok());
583 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
584 "testvalue"));
585
586 // Make sure all 20 phony interceptors were run
587 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
588
589 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
590 void* ignored_tag;
591 bool ignored_ok;
592 while (cli_cq.Next(&ignored_tag, &ignored_ok)) {
593 }
594 while (srv_cq->Next(&ignored_tag, &ignored_ok)) {
595 }
596 grpc_recycle_unused_port(port);
597 }
598
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnimplementedRpcTest)599 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
600 PhonyInterceptor::Reset();
601 int port = grpc_pick_unused_port_or_die();
602 string server_address = "localhost:" + std::to_string(port);
603 ServerBuilder builder;
604 builder.AddListeningPort(server_address, InsecureServerCredentials());
605 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
606 creators;
607 creators.reserve(20);
608 for (auto i = 0; i < 20; i++) {
609 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
610 }
611 builder.experimental().SetInterceptorCreators(std::move(creators));
612 auto cq = builder.AddCompletionQueue();
613 auto server = builder.BuildAndStart();
614
615 ChannelArguments args;
616 std::shared_ptr<Channel> channel =
617 grpc::CreateChannel(server_address, InsecureChannelCredentials());
618 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
619 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
620 EchoRequest send_request;
621 EchoResponse recv_response;
622 Status recv_status;
623
624 ClientContext cli_ctx;
625 send_request.set_message("Hello");
626 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
627 stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
628
629 response_reader->Finish(&recv_response, &recv_status, tag(4));
630 Verifier().Expect(4, true).Verify(cq.get());
631
632 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
633 EXPECT_EQ("", recv_status.error_message());
634
635 // Make sure all 20 phony interceptors were run
636 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
637
638 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
639 cq->Shutdown();
640 void* ignored_tag;
641 bool ignored_ok;
642 while (cq->Next(&ignored_tag, &ignored_ok)) {
643 }
644 grpc_recycle_unused_port(port);
645 }
646
647 class ServerInterceptorsSyncUnimplementedEnd2endTest : public ::testing::Test {
648 };
649
TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest,UnimplementedRpcTest)650 TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) {
651 PhonyInterceptor::Reset();
652 int port = grpc_pick_unused_port_or_die();
653 string server_address = "localhost:" + std::to_string(port);
654 ServerBuilder builder;
655 TestServiceImpl service;
656 builder.RegisterService(&service);
657 builder.AddListeningPort(server_address, InsecureServerCredentials());
658 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
659 creators;
660 creators.reserve(20);
661 for (auto i = 0; i < 20; i++) {
662 creators.push_back(std::make_unique<PhonyInterceptorFactory>());
663 }
664 builder.experimental().SetInterceptorCreators(std::move(creators));
665 auto server = builder.BuildAndStart();
666
667 ChannelArguments args;
668 std::shared_ptr<Channel> channel =
669 grpc::CreateChannel(server_address, InsecureChannelCredentials());
670 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
671 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
672 EchoRequest send_request;
673 EchoResponse recv_response;
674
675 ClientContext cli_ctx;
676 send_request.set_message("Hello");
677 Status recv_status =
678 stub->Unimplemented(&cli_ctx, send_request, &recv_response);
679
680 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
681 EXPECT_EQ("", recv_status.error_message());
682
683 // Make sure all 20 phony interceptors were run
684 EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
685
686 server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
687 grpc_recycle_unused_port(port);
688 }
689
690 } // namespace
691 } // namespace testing
692 } // namespace grpc
693
main(int argc,char ** argv)694 int main(int argc, char** argv) {
695 grpc::testing::TestEnvironment env(&argc, argv);
696 ::testing::InitGoogleTest(&argc, argv);
697 return RUN_ALL_TESTS();
698 }
699