xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/server_interceptors_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <vector>
21 
22 #include <gtest/gtest.h>
23 
24 #include "absl/memory/memory.h"
25 #include "absl/strings/match.h"
26 
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/generic/generic_stub.h>
31 #include <grpcpp/impl/proto_utils.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/server_interceptor.h>
36 
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40 #include "test/cpp/end2end/interceptors_util.h"
41 #include "test/cpp/end2end/test_service_impl.h"
42 #include "test/cpp/util/byte_buffer_proto_helper.h"
43 
44 namespace grpc {
45 namespace testing {
46 namespace {
47 
48 class LoggingInterceptor : public experimental::Interceptor {
49  public:
LoggingInterceptor(experimental::ServerRpcInfo * info)50   explicit LoggingInterceptor(experimental::ServerRpcInfo* info) {
51     // Check the method name and compare to the type
52     const char* method = info->method();
53     experimental::ServerRpcInfo::Type type = info->type();
54 
55     // Check that we use one of our standard methods with expected type.
56     // Also allow the health checking service.
57     // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
58     // being tested (the GenericRpc test).
59     // The empty method is for the Unimplemented requests that arise
60     // when draining the CQ.
61     EXPECT_TRUE(
62         strstr(method, "/grpc.health") == method ||
63         (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
64          (type == experimental::ServerRpcInfo::Type::UNARY ||
65           type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
66         (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
67          type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
68         (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
69          type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
70         (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
71          type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
72         strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
73         (strcmp(method, "") == 0 &&
74          type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
75   }
76 
Intercept(experimental::InterceptorBatchMethods * methods)77   void Intercept(experimental::InterceptorBatchMethods* methods) override {
78     if (methods->QueryInterceptionHookPoint(
79             experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
80       auto* map = methods->GetSendInitialMetadata();
81       // Got nothing better to do here for now
82       EXPECT_EQ(map->size(), 0);
83     }
84     if (methods->QueryInterceptionHookPoint(
85             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
86       EchoRequest req;
87       auto* buffer = methods->GetSerializedSendMessage();
88       auto copied_buffer = *buffer;
89       EXPECT_TRUE(
90           SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
91               .ok());
92       EXPECT_TRUE(req.message().find("Hello") == 0);
93     }
94     if (methods->QueryInterceptionHookPoint(
95             experimental::InterceptionHookPoints::PRE_SEND_STATUS)) {
96       auto* map = methods->GetSendTrailingMetadata();
97       bool found = false;
98       // Check that we received the metadata as an echo
99       for (const auto& pair : *map) {
100         found = absl::StartsWith(pair.first, "testkey") &&
101                 absl::StartsWith(pair.second, "testvalue");
102         if (found) break;
103       }
104       EXPECT_EQ(found, true);
105       auto status = methods->GetSendStatus();
106       EXPECT_EQ(status.ok(), true);
107     }
108     if (methods->QueryInterceptionHookPoint(
109             experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
110       auto* map = methods->GetRecvInitialMetadata();
111       bool found = false;
112       // Check that we received the metadata as an echo
113       for (const auto& pair : *map) {
114         found = pair.first.find("testkey") == 0 &&
115                 pair.second.find("testvalue") == 0;
116         if (found) break;
117       }
118       EXPECT_EQ(found, true);
119     }
120     if (methods->QueryInterceptionHookPoint(
121             experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
122       EchoResponse* resp =
123           static_cast<EchoResponse*>(methods->GetRecvMessage());
124       if (resp != nullptr) {
125         EXPECT_TRUE(resp->message().find("Hello") == 0);
126       }
127     }
128     if (methods->QueryInterceptionHookPoint(
129             experimental::InterceptionHookPoints::POST_RECV_CLOSE)) {
130       // Got nothing interesting to do here
131     }
132     methods->Proceed();
133   }
134 };
135 
136 class LoggingInterceptorFactory
137     : public experimental::ServerInterceptorFactoryInterface {
138  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)139   experimental::Interceptor* CreateServerInterceptor(
140       experimental::ServerRpcInfo* info) override {
141     return new LoggingInterceptor(info);
142   }
143 };
144 
145 // Test if SendMessage function family works as expected for sync/callback apis
146 class SyncSendMessageTester : public experimental::Interceptor {
147  public:
SyncSendMessageTester(experimental::ServerRpcInfo *)148   explicit SyncSendMessageTester(experimental::ServerRpcInfo* /*info*/) {}
149 
Intercept(experimental::InterceptorBatchMethods * methods)150   void Intercept(experimental::InterceptorBatchMethods* methods) override {
151     if (methods->QueryInterceptionHookPoint(
152             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
153       string old_msg =
154           static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
155       EXPECT_EQ(old_msg.find("Hello"), 0u);
156       new_msg_.set_message("World" + old_msg);
157       methods->ModifySendMessage(&new_msg_);
158     }
159     methods->Proceed();
160   }
161 
162  private:
163   EchoRequest new_msg_;
164 };
165 
166 class SyncSendMessageTesterFactory
167     : public experimental::ServerInterceptorFactoryInterface {
168  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)169   experimental::Interceptor* CreateServerInterceptor(
170       experimental::ServerRpcInfo* info) override {
171     return new SyncSendMessageTester(info);
172   }
173 };
174 
175 // Test if SendMessage function family works as expected for sync/callback apis
176 class SyncSendMessageVerifier : public experimental::Interceptor {
177  public:
SyncSendMessageVerifier(experimental::ServerRpcInfo *)178   explicit SyncSendMessageVerifier(experimental::ServerRpcInfo* /*info*/) {}
179 
Intercept(experimental::InterceptorBatchMethods * methods)180   void Intercept(experimental::InterceptorBatchMethods* methods) override {
181     if (methods->QueryInterceptionHookPoint(
182             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
183       // Make sure that the changes made in SyncSendMessageTester persisted
184       string old_msg =
185           static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
186       EXPECT_EQ(old_msg.find("World"), 0u);
187 
188       // Remove the "World" part of the string that we added earlier
189       new_msg_.set_message(old_msg.erase(0, 5));
190       methods->ModifySendMessage(&new_msg_);
191 
192       // LoggingInterceptor verifies that changes got reverted
193     }
194     methods->Proceed();
195   }
196 
197  private:
198   EchoRequest new_msg_;
199 };
200 
201 class SyncSendMessageVerifierFactory
202     : public experimental::ServerInterceptorFactoryInterface {
203  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)204   experimental::Interceptor* CreateServerInterceptor(
205       experimental::ServerRpcInfo* info) override {
206     return new SyncSendMessageVerifier(info);
207   }
208 };
209 
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)210 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
211   auto stub = grpc::testing::EchoTestService::NewStub(channel);
212   ClientContext ctx;
213   EchoRequest req;
214   EchoResponse resp;
215   ctx.AddMetadata("testkey", "testvalue");
216   auto stream = stub->BidiStream(&ctx);
217   for (auto i = 0; i < 10; i++) {
218     req.set_message("Hello" + std::to_string(i));
219     stream->Write(req);
220     stream->Read(&resp);
221     EXPECT_EQ(req.message(), resp.message());
222   }
223   ASSERT_TRUE(stream->WritesDone());
224   Status s = stream->Finish();
225   EXPECT_EQ(s.ok(), true);
226 }
227 
228 class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
229  protected:
ServerInterceptorsEnd2endSyncUnaryTest()230   ServerInterceptorsEnd2endSyncUnaryTest() {
231     int port = grpc_pick_unused_port_or_die();
232 
233     ServerBuilder builder;
234     server_address_ = "localhost:" + std::to_string(port);
235     builder.AddListeningPort(server_address_, InsecureServerCredentials());
236     builder.RegisterService(&service_);
237 
238     std::vector<
239         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
240         creators;
241     creators.push_back(
242         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
243             new SyncSendMessageTesterFactory()));
244     creators.push_back(
245         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
246             new SyncSendMessageVerifierFactory()));
247     creators.push_back(
248         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
249             new LoggingInterceptorFactory()));
250     // Add 20 phony interceptor factories and null interceptor factories
251     for (auto i = 0; i < 20; i++) {
252       creators.push_back(std::make_unique<PhonyInterceptorFactory>());
253       creators.push_back(std::make_unique<NullInterceptorFactory>());
254     }
255     builder.experimental().SetInterceptorCreators(std::move(creators));
256     server_ = builder.BuildAndStart();
257   }
258   std::string server_address_;
259   TestServiceImpl service_;
260   std::unique_ptr<Server> server_;
261 };
262 
TEST_F(ServerInterceptorsEnd2endSyncUnaryTest,UnaryTest)263 TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
264   ChannelArguments args;
265   PhonyInterceptor::Reset();
266   auto channel =
267       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
268   MakeCall(channel);
269   // Make sure all 20 phony interceptors were run
270   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
271 }
272 
273 class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
274  protected:
ServerInterceptorsEnd2endSyncStreamingTest()275   ServerInterceptorsEnd2endSyncStreamingTest() {
276     int port = grpc_pick_unused_port_or_die();
277 
278     ServerBuilder builder;
279     server_address_ = "localhost:" + std::to_string(port);
280     builder.AddListeningPort(server_address_, InsecureServerCredentials());
281     builder.RegisterService(&service_);
282 
283     std::vector<
284         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
285         creators;
286     creators.push_back(
287         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
288             new SyncSendMessageTesterFactory()));
289     creators.push_back(
290         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
291             new SyncSendMessageVerifierFactory()));
292     creators.push_back(
293         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
294             new LoggingInterceptorFactory()));
295     for (auto i = 0; i < 20; i++) {
296       creators.push_back(std::make_unique<PhonyInterceptorFactory>());
297     }
298     builder.experimental().SetInterceptorCreators(std::move(creators));
299     server_ = builder.BuildAndStart();
300   }
301   std::string server_address_;
302   EchoTestServiceStreamingImpl service_;
303   std::unique_ptr<Server> server_;
304 };
305 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ClientStreamingTest)306 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
307   ChannelArguments args;
308   PhonyInterceptor::Reset();
309   auto channel =
310       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
311   MakeClientStreamingCall(channel);
312   // Make sure all 20 phony interceptors were run
313   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
314 }
315 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ServerStreamingTest)316 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
317   ChannelArguments args;
318   PhonyInterceptor::Reset();
319   auto channel =
320       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
321   MakeServerStreamingCall(channel);
322   // Make sure all 20 phony interceptors were run
323   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
324 }
325 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,BidiStreamingTest)326 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
327   ChannelArguments args;
328   PhonyInterceptor::Reset();
329   auto channel =
330       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
331   MakeBidiStreamingCall(channel);
332   // Make sure all 20 phony interceptors were run
333   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
334 }
335 
336 class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {};
337 
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnaryTest)338 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) {
339   PhonyInterceptor::Reset();
340   int port = grpc_pick_unused_port_or_die();
341   string server_address = "localhost:" + std::to_string(port);
342   ServerBuilder builder;
343   EchoTestService::AsyncService service;
344   builder.AddListeningPort(server_address, InsecureServerCredentials());
345   builder.RegisterService(&service);
346   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
347       creators;
348   creators.push_back(
349       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
350           new LoggingInterceptorFactory()));
351   for (auto i = 0; i < 20; i++) {
352     creators.push_back(std::make_unique<PhonyInterceptorFactory>());
353   }
354   builder.experimental().SetInterceptorCreators(std::move(creators));
355   auto cq = builder.AddCompletionQueue();
356   auto server = builder.BuildAndStart();
357 
358   ChannelArguments args;
359   auto channel =
360       grpc::CreateChannel(server_address, InsecureChannelCredentials());
361   auto stub = grpc::testing::EchoTestService::NewStub(channel);
362 
363   EchoRequest send_request;
364   EchoRequest recv_request;
365   EchoResponse send_response;
366   EchoResponse recv_response;
367   Status recv_status;
368 
369   ClientContext cli_ctx;
370   ServerContext srv_ctx;
371   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
372 
373   send_request.set_message("Hello");
374   cli_ctx.AddMetadata("testkey", "testvalue");
375   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
376       stub->AsyncEcho(&cli_ctx, send_request, cq.get()));
377 
378   service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(),
379                       cq.get(), tag(2));
380 
381   response_reader->Finish(&recv_response, &recv_status, tag(4));
382 
383   Verifier().Expect(2, true).Verify(cq.get());
384   EXPECT_EQ(send_request.message(), recv_request.message());
385 
386   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
387   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
388 
389   send_response.set_message(recv_request.message());
390   response_writer.Finish(send_response, Status::OK, tag(3));
391   Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
392 
393   EXPECT_EQ(send_response.message(), recv_response.message());
394   EXPECT_TRUE(recv_status.ok());
395   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
396                             "testvalue"));
397 
398   // Make sure all 20 phony interceptors were run
399   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
400 
401   server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
402   cq->Shutdown();
403   void* ignored_tag;
404   bool ignored_ok;
405   while (cq->Next(&ignored_tag, &ignored_ok)) {
406   }
407   grpc_recycle_unused_port(port);
408 }
409 
TEST_F(ServerInterceptorsAsyncEnd2endTest,BidiStreamingTest)410 TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
411   PhonyInterceptor::Reset();
412   int port = grpc_pick_unused_port_or_die();
413   string server_address = "localhost:" + std::to_string(port);
414   ServerBuilder builder;
415   EchoTestService::AsyncService service;
416   builder.AddListeningPort(server_address, InsecureServerCredentials());
417   builder.RegisterService(&service);
418   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
419       creators;
420   creators.push_back(
421       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
422           new LoggingInterceptorFactory()));
423   for (auto i = 0; i < 20; i++) {
424     creators.push_back(std::make_unique<PhonyInterceptorFactory>());
425   }
426   builder.experimental().SetInterceptorCreators(std::move(creators));
427   auto cq = builder.AddCompletionQueue();
428   auto server = builder.BuildAndStart();
429 
430   ChannelArguments args;
431   auto channel =
432       grpc::CreateChannel(server_address, InsecureChannelCredentials());
433   auto stub = grpc::testing::EchoTestService::NewStub(channel);
434 
435   EchoRequest send_request;
436   EchoRequest recv_request;
437   EchoResponse send_response;
438   EchoResponse recv_response;
439   Status recv_status;
440 
441   ClientContext cli_ctx;
442   ServerContext srv_ctx;
443   grpc::ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
444 
445   send_request.set_message("Hello");
446   cli_ctx.AddMetadata("testkey", "testvalue");
447   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
448       cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1)));
449 
450   service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2));
451 
452   Verifier().Expect(1, true).Expect(2, true).Verify(cq.get());
453 
454   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
455   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
456 
457   cli_stream->Write(send_request, tag(3));
458   srv_stream.Read(&recv_request, tag(4));
459   Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
460   EXPECT_EQ(send_request.message(), recv_request.message());
461 
462   send_response.set_message(recv_request.message());
463   srv_stream.Write(send_response, tag(5));
464   cli_stream->Read(&recv_response, tag(6));
465   Verifier().Expect(5, true).Expect(6, true).Verify(cq.get());
466   EXPECT_EQ(send_response.message(), recv_response.message());
467 
468   cli_stream->WritesDone(tag(7));
469   srv_stream.Read(&recv_request, tag(8));
470   Verifier().Expect(7, true).Expect(8, false).Verify(cq.get());
471 
472   srv_stream.Finish(Status::OK, tag(9));
473   cli_stream->Finish(&recv_status, tag(10));
474   Verifier().Expect(9, true).Expect(10, true).Verify(cq.get());
475 
476   EXPECT_TRUE(recv_status.ok());
477   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
478                             "testvalue"));
479 
480   // Make sure all 20 phony interceptors were run
481   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
482 
483   server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
484   cq->Shutdown();
485   void* ignored_tag;
486   bool ignored_ok;
487   while (cq->Next(&ignored_tag, &ignored_ok)) {
488   }
489   grpc_recycle_unused_port(port);
490 }
491 
TEST_F(ServerInterceptorsAsyncEnd2endTest,GenericRPCTest)492 TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
493   PhonyInterceptor::Reset();
494   int port = grpc_pick_unused_port_or_die();
495   string server_address = "localhost:" + std::to_string(port);
496   ServerBuilder builder;
497   AsyncGenericService service;
498   builder.AddListeningPort(server_address, InsecureServerCredentials());
499   builder.RegisterAsyncGenericService(&service);
500   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
501       creators;
502   creators.reserve(20);
503   for (auto i = 0; i < 20; i++) {
504     creators.push_back(std::make_unique<PhonyInterceptorFactory>());
505   }
506   builder.experimental().SetInterceptorCreators(std::move(creators));
507   auto srv_cq = builder.AddCompletionQueue();
508   CompletionQueue cli_cq;
509   auto server = builder.BuildAndStart();
510 
511   ChannelArguments args;
512   auto channel =
513       grpc::CreateChannel(server_address, InsecureChannelCredentials());
514   GenericStub generic_stub(channel);
515 
516   const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
517   EchoRequest send_request;
518   EchoRequest recv_request;
519   EchoResponse send_response;
520   EchoResponse recv_response;
521   Status recv_status;
522 
523   ClientContext cli_ctx;
524   GenericServerContext srv_ctx;
525   GenericServerAsyncReaderWriter stream(&srv_ctx);
526 
527   // The string needs to be long enough to test heap-based slice.
528   send_request.set_message("Hello");
529   cli_ctx.AddMetadata("testkey", "testvalue");
530 
531   CompletionQueue* cq = srv_cq.get();
532   std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
533   std::unique_ptr<GenericClientAsyncReaderWriter> call =
534       generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
535   call->StartCall(tag(1));
536   Verifier().Expect(1, true).Verify(&cli_cq);
537   std::unique_ptr<ByteBuffer> send_buffer =
538       SerializeToByteBuffer(&send_request);
539   call->Write(*send_buffer, tag(2));
540   // Send ByteBuffer can be destroyed after calling Write.
541   send_buffer.reset();
542   Verifier().Expect(2, true).Verify(&cli_cq);
543   call->WritesDone(tag(3));
544   Verifier().Expect(3, true).Verify(&cli_cq);
545 
546   service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
547 
548   request_call.join();
549   EXPECT_EQ(kMethodName, srv_ctx.method());
550   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
551   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
552 
553   ByteBuffer recv_buffer;
554   stream.Read(&recv_buffer, tag(5));
555   Verifier().Expect(5, true).Verify(srv_cq.get());
556   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
557   EXPECT_EQ(send_request.message(), recv_request.message());
558 
559   send_response.set_message(recv_request.message());
560   send_buffer = SerializeToByteBuffer(&send_response);
561   stream.Write(*send_buffer, tag(6));
562   send_buffer.reset();
563   Verifier().Expect(6, true).Verify(srv_cq.get());
564 
565   stream.Finish(Status::OK, tag(7));
566   // Shutdown srv_cq before we try to get the tag back, to verify that the
567   // interception API handles completion queue shutdowns that take place before
568   // all the tags are returned
569   srv_cq->Shutdown();
570   Verifier().Expect(7, true).Verify(srv_cq.get());
571 
572   recv_buffer.Clear();
573   call->Read(&recv_buffer, tag(8));
574   Verifier().Expect(8, true).Verify(&cli_cq);
575   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
576 
577   call->Finish(&recv_status, tag(9));
578   cli_cq.Shutdown();
579   Verifier().Expect(9, true).Verify(&cli_cq);
580 
581   EXPECT_EQ(send_response.message(), recv_response.message());
582   EXPECT_TRUE(recv_status.ok());
583   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
584                             "testvalue"));
585 
586   // Make sure all 20 phony interceptors were run
587   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
588 
589   server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
590   void* ignored_tag;
591   bool ignored_ok;
592   while (cli_cq.Next(&ignored_tag, &ignored_ok)) {
593   }
594   while (srv_cq->Next(&ignored_tag, &ignored_ok)) {
595   }
596   grpc_recycle_unused_port(port);
597 }
598 
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnimplementedRpcTest)599 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
600   PhonyInterceptor::Reset();
601   int port = grpc_pick_unused_port_or_die();
602   string server_address = "localhost:" + std::to_string(port);
603   ServerBuilder builder;
604   builder.AddListeningPort(server_address, InsecureServerCredentials());
605   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
606       creators;
607   creators.reserve(20);
608   for (auto i = 0; i < 20; i++) {
609     creators.push_back(std::make_unique<PhonyInterceptorFactory>());
610   }
611   builder.experimental().SetInterceptorCreators(std::move(creators));
612   auto cq = builder.AddCompletionQueue();
613   auto server = builder.BuildAndStart();
614 
615   ChannelArguments args;
616   std::shared_ptr<Channel> channel =
617       grpc::CreateChannel(server_address, InsecureChannelCredentials());
618   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
619   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
620   EchoRequest send_request;
621   EchoResponse recv_response;
622   Status recv_status;
623 
624   ClientContext cli_ctx;
625   send_request.set_message("Hello");
626   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
627       stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
628 
629   response_reader->Finish(&recv_response, &recv_status, tag(4));
630   Verifier().Expect(4, true).Verify(cq.get());
631 
632   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
633   EXPECT_EQ("", recv_status.error_message());
634 
635   // Make sure all 20 phony interceptors were run
636   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
637 
638   server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
639   cq->Shutdown();
640   void* ignored_tag;
641   bool ignored_ok;
642   while (cq->Next(&ignored_tag, &ignored_ok)) {
643   }
644   grpc_recycle_unused_port(port);
645 }
646 
647 class ServerInterceptorsSyncUnimplementedEnd2endTest : public ::testing::Test {
648 };
649 
TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest,UnimplementedRpcTest)650 TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) {
651   PhonyInterceptor::Reset();
652   int port = grpc_pick_unused_port_or_die();
653   string server_address = "localhost:" + std::to_string(port);
654   ServerBuilder builder;
655   TestServiceImpl service;
656   builder.RegisterService(&service);
657   builder.AddListeningPort(server_address, InsecureServerCredentials());
658   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
659       creators;
660   creators.reserve(20);
661   for (auto i = 0; i < 20; i++) {
662     creators.push_back(std::make_unique<PhonyInterceptorFactory>());
663   }
664   builder.experimental().SetInterceptorCreators(std::move(creators));
665   auto server = builder.BuildAndStart();
666 
667   ChannelArguments args;
668   std::shared_ptr<Channel> channel =
669       grpc::CreateChannel(server_address, InsecureChannelCredentials());
670   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
671   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
672   EchoRequest send_request;
673   EchoResponse recv_response;
674 
675   ClientContext cli_ctx;
676   send_request.set_message("Hello");
677   Status recv_status =
678       stub->Unimplemented(&cli_ctx, send_request, &recv_response);
679 
680   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
681   EXPECT_EQ("", recv_status.error_message());
682 
683   // Make sure all 20 phony interceptors were run
684   EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
685 
686   server->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
687   grpc_recycle_unused_port(port);
688 }
689 
690 }  // namespace
691 }  // namespace testing
692 }  // namespace grpc
693 
main(int argc,char ** argv)694 int main(int argc, char** argv) {
695   grpc::testing::TestEnvironment env(&argc, argv);
696   ::testing::InitGoogleTest(&argc, argv);
697   return RUN_ALL_TESTS();
698 }
699