xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/hybrid_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <gtest/gtest.h>
23 
24 #include <grpc/grpc.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/generic/async_generic_service.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 
33 #include "src/core/lib/gprpp/env.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 
42 namespace grpc {
43 namespace testing {
44 namespace {
45 
tag(int i)46 void* tag(int i) { return reinterpret_cast<void*>(i); }
47 
VerifyReturnSuccess(CompletionQueue * cq,int i)48 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
49   void* got_tag;
50   bool ok;
51   EXPECT_TRUE(cq->Next(&got_tag, &ok));
52   EXPECT_EQ(tag(i), got_tag);
53   return ok;
54 }
55 
Verify(CompletionQueue * cq,int i,bool expect_ok)56 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
57   EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
58 }
59 
60 // Handlers to handle async request at a server. To be run in a separate thread.
61 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)62 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
63   ServerContext srv_ctx;
64   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
65   EchoRequest recv_request;
66   EchoResponse send_response;
67   service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
68                        tag(1));
69   Verify(cq, 1, true);
70   send_response.set_message(recv_request.message());
71   if (dup_service) {
72     send_response.mutable_message()->append("_dup");
73   }
74   response_writer.Finish(send_response, Status::OK, tag(2));
75   Verify(cq, 2, true);
76 }
77 
78 // Handlers to handle raw request at a server. To be run in a
79 // separate thread. Note that this is the same as the async version, except
80 // that the req/resp are ByteBuffers
81 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)82 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
83                    bool /*dup_service*/) {
84   ServerContext srv_ctx;
85   GenericServerAsyncResponseWriter response_writer(&srv_ctx);
86   ByteBuffer recv_buffer;
87   service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
88                        tag(1));
89   Verify(cq, 1, true);
90   EchoRequest recv_request;
91   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
92   EchoResponse send_response;
93   send_response.set_message(recv_request.message());
94   auto send_buffer = SerializeToByteBuffer(&send_response);
95   response_writer.Finish(*send_buffer, Status::OK, tag(2));
96   Verify(cq, 2, true);
97 }
98 
99 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)100 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
101   ServerContext srv_ctx;
102   EchoRequest recv_request;
103   EchoResponse send_response;
104   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
105   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
106   Verify(cq, 1, true);
107   int i = 1;
108   do {
109     i++;
110     send_response.mutable_message()->append(recv_request.message());
111     srv_stream.Read(&recv_request, tag(i));
112   } while (VerifyReturnSuccess(cq, i));
113   srv_stream.Finish(send_response, Status::OK, tag(100));
114   Verify(cq, 100, true);
115 }
116 
117 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)118 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
119   ServerContext srv_ctx;
120   ByteBuffer recv_buffer;
121   EchoRequest recv_request;
122   EchoResponse send_response;
123   GenericServerAsyncReader srv_stream(&srv_ctx);
124   service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
125   Verify(cq, 1, true);
126   int i = 1;
127   while (true) {
128     i++;
129     srv_stream.Read(&recv_buffer, tag(i));
130     if (!VerifyReturnSuccess(cq, i)) {
131       break;
132     }
133     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
134     send_response.mutable_message()->append(recv_request.message());
135   }
136   auto send_buffer = SerializeToByteBuffer(&send_response);
137   srv_stream.Finish(*send_buffer, Status::OK, tag(100));
138   Verify(cq, 100, true);
139 }
140 
141 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)142 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
143   ServerContext srv_ctx;
144   EchoRequest recv_request;
145   EchoResponse send_response;
146   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
147   service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
148                                  tag(1));
149   Verify(cq, 1, true);
150   send_response.set_message(recv_request.message() + "0");
151   srv_stream.Write(send_response, tag(2));
152   Verify(cq, 2, true);
153   send_response.set_message(recv_request.message() + "1");
154   srv_stream.Write(send_response, tag(3));
155   Verify(cq, 3, true);
156   send_response.set_message(recv_request.message() + "2");
157   srv_stream.Write(send_response, tag(4));
158   Verify(cq, 4, true);
159   srv_stream.Finish(Status::OK, tag(5));
160   Verify(cq, 5, true);
161 }
162 
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)163 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
164                        CompletionQueue* cq) {
165   ByteBuffer recv_buffer;
166   stream->Read(&recv_buffer, tag(2));
167   Verify(cq, 2, true);
168   EchoRequest recv_request;
169   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
170   EchoResponse send_response;
171   send_response.set_message(recv_request.message());
172   auto send_buffer = SerializeToByteBuffer(&send_response);
173   stream->Write(*send_buffer, tag(3));
174   Verify(cq, 3, true);
175   stream->Finish(Status::OK, tag(4));
176   Verify(cq, 4, true);
177 }
178 
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)179 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
180                                 CompletionQueue* cq) {
181   ByteBuffer recv_buffer;
182   EchoRequest recv_request;
183   EchoResponse send_response;
184   int i = 1;
185   while (true) {
186     i++;
187     stream->Read(&recv_buffer, tag(i));
188     if (!VerifyReturnSuccess(cq, i)) {
189       break;
190     }
191     EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
192     send_response.mutable_message()->append(recv_request.message());
193   }
194   auto send_buffer = SerializeToByteBuffer(&send_response);
195   stream->Write(*send_buffer, tag(99));
196   Verify(cq, 99, true);
197   stream->Finish(Status::OK, tag(100));
198   Verify(cq, 100, true);
199 }
200 
201 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)202 void HandleGenericCall(AsyncGenericService* service,
203                        ServerCompletionQueue* cq) {
204   GenericServerContext srv_ctx;
205   GenericServerAsyncReaderWriter stream(&srv_ctx);
206   service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
207   Verify(cq, 1, true);
208   if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
209     HandleGenericEcho(&stream, cq);
210   } else if (srv_ctx.method() ==
211              "/grpc.testing.EchoTestService/RequestStream") {
212     HandleGenericRequestStream(&stream, cq);
213   } else {  // other methods not handled yet.
214     gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
215     GPR_ASSERT(0);
216   }
217 }
218 
219 class TestServiceImplDupPkg
220     : public grpc::testing::duplicate::EchoTestService::Service {
221  public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)222   Status Echo(ServerContext* /*context*/, const EchoRequest* request,
223               EchoResponse* response) override {
224     response->set_message(request->message() + "_dup");
225     return Status::OK;
226   }
227 };
228 
229 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
230  protected:
HybridEnd2endTest()231   HybridEnd2endTest() {}
232 
SetUpTestSuite()233   static void SetUpTestSuite() {
234 #if TARGET_OS_IPHONE
235     // Workaround Apple CFStream bug
236     grpc_core::SetEnv("grpc_cfstream", "0");
237 #endif
238   }
239 
SetUp()240   void SetUp() override {
241     inproc_ = (::testing::UnitTest::GetInstance()
242                    ->current_test_info()
243                    ->value_param() != nullptr)
244                   ? GetParam()
245                   : false;
246   }
247 
SetUpServer(grpc::Service * service1,grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)248   bool SetUpServer(grpc::Service* service1, grpc::Service* service2,
249                    AsyncGenericService* generic_service,
250                    CallbackGenericService* callback_generic_service,
251                    int max_message_size = 0) {
252     int port = grpc_pick_unused_port_or_die();
253     server_address_ << "localhost:" << port;
254 
255     // Setup server
256     ServerBuilder builder;
257     builder.AddListeningPort(server_address_.str(),
258                              grpc::InsecureServerCredentials());
259     // Always add a sync unimplemented service: we rely on having at least one
260     // synchronous method to get a listening cq
261     builder.RegisterService(&unimplemented_service_);
262     builder.RegisterService(service1);
263     if (service2) {
264       builder.RegisterService(service2);
265     }
266     if (generic_service) {
267       builder.RegisterAsyncGenericService(generic_service);
268     }
269     if (callback_generic_service) {
270       builder.RegisterCallbackGenericService(callback_generic_service);
271     }
272 
273     if (max_message_size != 0) {
274       builder.SetMaxMessageSize(max_message_size);
275     }
276 
277     // Create a separate cq for each potential handler.
278     for (int i = 0; i < 5; i++) {
279       cqs_.push_back(builder.AddCompletionQueue(false));
280     }
281     server_ = builder.BuildAndStart();
282 
283     // If there is a generic callback service, this setup is only successful if
284     // we have an iomgr that can run in the background or are inprocess
285     return !callback_generic_service || grpc_iomgr_run_in_background() ||
286            inproc_;
287   }
288 
TearDown()289   void TearDown() override {
290     if (server_) {
291       server_->Shutdown();
292     }
293     void* ignored_tag;
294     bool ignored_ok;
295     for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
296       (*it)->Shutdown();
297       while ((*it)->Next(&ignored_tag, &ignored_ok)) {
298       }
299     }
300   }
301 
ResetStub()302   void ResetStub() {
303     std::shared_ptr<Channel> channel =
304         inproc_ ? server_->InProcessChannel(ChannelArguments())
305                 : grpc::CreateChannel(server_address_.str(),
306                                       InsecureChannelCredentials());
307     stub_ = grpc::testing::EchoTestService::NewStub(channel);
308   }
309 
310   // Test all rpc methods.
TestAllMethods()311   void TestAllMethods() {
312     SendEcho();
313     SendSimpleClientStreaming();
314     SendSimpleServerStreaming();
315     SendBidiStreaming();
316   }
317 
SendEcho()318   void SendEcho() {
319     EchoRequest send_request;
320     EchoResponse recv_response;
321     ClientContext cli_ctx;
322     cli_ctx.set_wait_for_ready(true);
323     send_request.set_message("Hello");
324     Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
325     EXPECT_EQ(send_request.message(), recv_response.message());
326     EXPECT_TRUE(recv_status.ok());
327   }
328 
SendEchoToDupService()329   void SendEchoToDupService() {
330     std::shared_ptr<Channel> channel = grpc::CreateChannel(
331         server_address_.str(), InsecureChannelCredentials());
332     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
333     EchoRequest send_request;
334     EchoResponse recv_response;
335     ClientContext cli_ctx;
336     cli_ctx.set_wait_for_ready(true);
337     send_request.set_message("Hello");
338     Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
339     EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
340     EXPECT_TRUE(recv_status.ok());
341   }
342 
SendSimpleClientStreaming()343   void SendSimpleClientStreaming() {
344     EchoRequest send_request;
345     EchoResponse recv_response;
346     std::string expected_message;
347     ClientContext cli_ctx;
348     cli_ctx.set_wait_for_ready(true);
349     send_request.set_message("Hello");
350     auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
351     for (int i = 0; i < 5; i++) {
352       EXPECT_TRUE(stream->Write(send_request));
353       expected_message.append(send_request.message());
354     }
355     stream->WritesDone();
356     Status recv_status = stream->Finish();
357     EXPECT_EQ(expected_message, recv_response.message());
358     EXPECT_TRUE(recv_status.ok());
359   }
360 
SendSimpleServerStreaming()361   void SendSimpleServerStreaming() {
362     EchoRequest request;
363     EchoResponse response;
364     ClientContext context;
365     context.set_wait_for_ready(true);
366     request.set_message("hello");
367 
368     auto stream = stub_->ResponseStream(&context, request);
369     EXPECT_TRUE(stream->Read(&response));
370     EXPECT_EQ(response.message(), request.message() + "0");
371     EXPECT_TRUE(stream->Read(&response));
372     EXPECT_EQ(response.message(), request.message() + "1");
373     EXPECT_TRUE(stream->Read(&response));
374     EXPECT_EQ(response.message(), request.message() + "2");
375     EXPECT_FALSE(stream->Read(&response));
376 
377     Status s = stream->Finish();
378     EXPECT_TRUE(s.ok());
379   }
380 
SendSimpleServerStreamingToDupService()381   void SendSimpleServerStreamingToDupService() {
382     std::shared_ptr<Channel> channel = grpc::CreateChannel(
383         server_address_.str(), InsecureChannelCredentials());
384     auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
385     EchoRequest request;
386     EchoResponse response;
387     ClientContext context;
388     context.set_wait_for_ready(true);
389     request.set_message("hello");
390 
391     auto stream = stub->ResponseStream(&context, request);
392     EXPECT_TRUE(stream->Read(&response));
393     EXPECT_EQ(response.message(), request.message() + "0_dup");
394     EXPECT_TRUE(stream->Read(&response));
395     EXPECT_EQ(response.message(), request.message() + "1_dup");
396     EXPECT_TRUE(stream->Read(&response));
397     EXPECT_EQ(response.message(), request.message() + "2_dup");
398     EXPECT_FALSE(stream->Read(&response));
399 
400     Status s = stream->Finish();
401     EXPECT_TRUE(s.ok());
402   }
403 
SendBidiStreaming()404   void SendBidiStreaming() {
405     EchoRequest request;
406     EchoResponse response;
407     ClientContext context;
408     context.set_wait_for_ready(true);
409     std::string msg("hello");
410 
411     auto stream = stub_->BidiStream(&context);
412 
413     request.set_message(msg + "0");
414     EXPECT_TRUE(stream->Write(request));
415     EXPECT_TRUE(stream->Read(&response));
416     EXPECT_EQ(response.message(), request.message());
417 
418     request.set_message(msg + "1");
419     EXPECT_TRUE(stream->Write(request));
420     EXPECT_TRUE(stream->Read(&response));
421     EXPECT_EQ(response.message(), request.message());
422 
423     request.set_message(msg + "2");
424     EXPECT_TRUE(stream->Write(request));
425     EXPECT_TRUE(stream->Read(&response));
426     EXPECT_EQ(response.message(), request.message());
427 
428     stream->WritesDone();
429     EXPECT_FALSE(stream->Read(&response));
430     EXPECT_FALSE(stream->Read(&response));
431 
432     Status s = stream->Finish();
433     EXPECT_TRUE(s.ok());
434   }
435 
436   grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
437   std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
438   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
439   std::unique_ptr<Server> server_;
440   std::ostringstream server_address_;
441   bool inproc_;
442 };
443 
TEST_F(HybridEnd2endTest,AsyncEcho)444 TEST_F(HybridEnd2endTest, AsyncEcho) {
445   typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
446   SType service;
447   SetUpServer(&service, nullptr, nullptr, nullptr);
448   ResetStub();
449   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
450                                   false);
451   TestAllMethods();
452   echo_handler_thread.join();
453 }
454 
TEST_F(HybridEnd2endTest,RawEcho)455 TEST_F(HybridEnd2endTest, RawEcho) {
456   typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
457   SType service;
458   SetUpServer(&service, nullptr, nullptr, nullptr);
459   ResetStub();
460   std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
461                                   false);
462   TestAllMethods();
463   echo_handler_thread.join();
464 }
465 
TEST_F(HybridEnd2endTest,RawRequestStream)466 TEST_F(HybridEnd2endTest, RawRequestStream) {
467   typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
468   SType service;
469   SetUpServer(&service, nullptr, nullptr, nullptr);
470   ResetStub();
471   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
472                                             &service, cqs_[0].get());
473   TestAllMethods();
474   request_stream_handler_thread.join();
475 }
476 
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)477 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
478   typedef EchoTestService::WithRawMethod_RequestStream<
479       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
480       SType;
481   SType service;
482   SetUpServer(&service, nullptr, nullptr, nullptr);
483   ResetStub();
484   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
485                                   false);
486   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
487                                             &service, cqs_[1].get());
488   TestAllMethods();
489   request_stream_handler_thread.join();
490   echo_handler_thread.join();
491 }
492 
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)493 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
494   typedef EchoTestService::WithRawMethod_RequestStream<
495       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
496       SType;
497   SType service;
498   AsyncGenericService generic_service;
499   SetUpServer(&service, nullptr, &generic_service, nullptr);
500   ResetStub();
501   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
502                                      cqs_[0].get());
503   std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
504                                             &service, cqs_[1].get());
505   TestAllMethods();
506   generic_handler_thread.join();
507   request_stream_handler_thread.join();
508 }
509 
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)510 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
511   typedef EchoTestService::WithAsyncMethod_RequestStream<
512       EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
513       SType;
514   SType service;
515   SetUpServer(&service, nullptr, nullptr, nullptr);
516   ResetStub();
517   std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
518                                   false);
519   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
520                                             &service, cqs_[1].get());
521   TestAllMethods();
522   echo_handler_thread.join();
523   request_stream_handler_thread.join();
524 }
525 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)526 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
527   typedef EchoTestService::WithAsyncMethod_RequestStream<
528       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
529       SType;
530   SType service;
531   SetUpServer(&service, nullptr, nullptr, nullptr);
532   ResetStub();
533   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
534                                              &service, cqs_[0].get());
535   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
536                                             &service, cqs_[1].get());
537   TestAllMethods();
538   response_stream_handler_thread.join();
539   request_stream_handler_thread.join();
540 }
541 
542 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncDupService)543 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamSyncDupService) {
544   typedef EchoTestService::WithAsyncMethod_RequestStream<
545       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
546       SType;
547   SType service;
548   TestServiceImplDupPkg dup_service;
549   SetUpServer(&service, &dup_service, nullptr, nullptr);
550   ResetStub();
551   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
552                                              &service, cqs_[0].get());
553   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
554                                             &service, cqs_[1].get());
555   TestAllMethods();
556   SendEchoToDupService();
557   response_stream_handler_thread.join();
558   request_stream_handler_thread.join();
559 }
560 
561 // Add a second service with one sync streamed unary method.
562 class StreamedUnaryDupPkg
563     : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
564           TestServiceImplDupPkg> {
565  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)566   Status StreamedEcho(
567       ServerContext* /*context*/,
568       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
569     EchoRequest req;
570     EchoResponse resp;
571     uint32_t next_msg_sz;
572     stream->NextMessageSize(&next_msg_sz);
573     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
574     GPR_ASSERT(stream->Read(&req));
575     resp.set_message(req.message() + "_dup");
576     GPR_ASSERT(stream->Write(resp));
577     return Status::OK;
578   }
579 };
580 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService)581 TEST_F(HybridEnd2endTest,
582        AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService) {
583   typedef EchoTestService::WithAsyncMethod_RequestStream<
584       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
585       SType;
586   SType service;
587   StreamedUnaryDupPkg dup_service;
588   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
589   ResetStub();
590   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
591                                              &service, cqs_[0].get());
592   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
593                                             &service, cqs_[1].get());
594   TestAllMethods();
595   SendEchoToDupService();
596   response_stream_handler_thread.join();
597   request_stream_handler_thread.join();
598 }
599 
600 // Add a second service that is fully Streamed Unary
601 class FullyStreamedUnaryDupPkg
602     : public duplicate::EchoTestService::StreamedUnaryService {
603  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)604   Status StreamedEcho(
605       ServerContext* /*context*/,
606       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
607     EchoRequest req;
608     EchoResponse resp;
609     uint32_t next_msg_sz;
610     stream->NextMessageSize(&next_msg_sz);
611     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
612     GPR_ASSERT(stream->Read(&req));
613     resp.set_message(req.message() + "_dup");
614     GPR_ASSERT(stream->Write(resp));
615     return Status::OK;
616   }
617 };
618 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService)619 TEST_F(HybridEnd2endTest,
620        AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService) {
621   typedef EchoTestService::WithAsyncMethod_RequestStream<
622       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
623       SType;
624   SType service;
625   FullyStreamedUnaryDupPkg dup_service;
626   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
627   ResetStub();
628   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
629                                              &service, cqs_[0].get());
630   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
631                                             &service, cqs_[1].get());
632   TestAllMethods();
633   SendEchoToDupService();
634   response_stream_handler_thread.join();
635   request_stream_handler_thread.join();
636 }
637 
638 // Add a second service with one sync split server streaming method.
639 class SplitResponseStreamDupPkg
640     : public duplicate::EchoTestService::
641           WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
642  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)643   Status StreamedResponseStream(
644       ServerContext* /*context*/,
645       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
646     EchoRequest req;
647     EchoResponse resp;
648     uint32_t next_msg_sz;
649     stream->NextMessageSize(&next_msg_sz);
650     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
651     GPR_ASSERT(stream->Read(&req));
652     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
653       resp.set_message(req.message() + std::to_string(i) + "_dup");
654       GPR_ASSERT(stream->Write(resp));
655     }
656     return Status::OK;
657   }
658 };
659 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncSplitStreamedDupService)660 TEST_F(HybridEnd2endTest,
661        AsyncRequestStreamResponseStreamSyncSplitStreamedDupService) {
662   typedef EchoTestService::WithAsyncMethod_RequestStream<
663       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
664       SType;
665   SType service;
666   SplitResponseStreamDupPkg dup_service;
667   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
668   ResetStub();
669   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
670                                              &service, cqs_[0].get());
671   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
672                                             &service, cqs_[1].get());
673   TestAllMethods();
674   SendSimpleServerStreamingToDupService();
675   response_stream_handler_thread.join();
676   request_stream_handler_thread.join();
677 }
678 
679 // Add a second service that is fully split server streamed
680 class FullySplitStreamedDupPkg
681     : public duplicate::EchoTestService::SplitStreamedService {
682  public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)683   Status StreamedResponseStream(
684       ServerContext* /*context*/,
685       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
686     EchoRequest req;
687     EchoResponse resp;
688     uint32_t next_msg_sz;
689     stream->NextMessageSize(&next_msg_sz);
690     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
691     GPR_ASSERT(stream->Read(&req));
692     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
693       resp.set_message(req.message() + std::to_string(i) + "_dup");
694       GPR_ASSERT(stream->Write(resp));
695     }
696     return Status::OK;
697   }
698 };
699 
TEST_F(HybridEnd2endTest,asyncRequestStreamResponseStreamFullySplitStreamedDupService)700 TEST_F(HybridEnd2endTest,
701        asyncRequestStreamResponseStreamFullySplitStreamedDupService) {
702   typedef EchoTestService::WithAsyncMethod_RequestStream<
703       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
704       SType;
705   SType service;
706   FullySplitStreamedDupPkg dup_service;
707   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
708   ResetStub();
709   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
710                                              &service, cqs_[0].get());
711   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
712                                             &service, cqs_[1].get());
713   TestAllMethods();
714   SendSimpleServerStreamingToDupService();
715   response_stream_handler_thread.join();
716   request_stream_handler_thread.join();
717 }
718 
719 // Add a second service that is fully server streamed
720 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
721  public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)722   Status StreamedEcho(
723       ServerContext* /*context*/,
724       ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
725     EchoRequest req;
726     EchoResponse resp;
727     uint32_t next_msg_sz;
728     stream->NextMessageSize(&next_msg_sz);
729     gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
730     GPR_ASSERT(stream->Read(&req));
731     resp.set_message(req.message() + "_dup");
732     GPR_ASSERT(stream->Write(resp));
733     return Status::OK;
734   }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)735   Status StreamedResponseStream(
736       ServerContext* /*context*/,
737       ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
738     EchoRequest req;
739     EchoResponse resp;
740     uint32_t next_msg_sz;
741     stream->NextMessageSize(&next_msg_sz);
742     gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
743     GPR_ASSERT(stream->Read(&req));
744     for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
745       resp.set_message(req.message() + std::to_string(i) + "_dup");
746       GPR_ASSERT(stream->Write(resp));
747     }
748     return Status::OK;
749   }
750 };
751 
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamFullyStreamedDupService)752 TEST_F(HybridEnd2endTest,
753        AsyncRequestStreamResponseStreamFullyStreamedDupService) {
754   typedef EchoTestService::WithAsyncMethod_RequestStream<
755       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
756       SType;
757   SType service;
758   FullyStreamedDupPkg dup_service;
759   SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
760   ResetStub();
761   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
762                                              &service, cqs_[0].get());
763   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
764                                             &service, cqs_[1].get());
765   TestAllMethods();
766   SendEchoToDupService();
767   SendSimpleServerStreamingToDupService();
768   response_stream_handler_thread.join();
769   request_stream_handler_thread.join();
770 }
771 
772 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamAsyncDupService)773 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamAsyncDupService) {
774   typedef EchoTestService::WithAsyncMethod_RequestStream<
775       EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
776       SType;
777   SType service;
778   duplicate::EchoTestService::AsyncService dup_service;
779   SetUpServer(&service, &dup_service, nullptr, nullptr);
780   ResetStub();
781   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
782                                              &service, cqs_[0].get());
783   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
784                                             &service, cqs_[1].get());
785   std::thread echo_handler_thread(
786       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
787       cqs_[2].get(), true);
788   TestAllMethods();
789   SendEchoToDupService();
790   response_stream_handler_thread.join();
791   request_stream_handler_thread.join();
792   echo_handler_thread.join();
793 }
794 
TEST_F(HybridEnd2endTest,GenericEcho)795 TEST_F(HybridEnd2endTest, GenericEcho) {
796   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
797   AsyncGenericService generic_service;
798   SetUpServer(&service, nullptr, &generic_service, nullptr);
799   ResetStub();
800   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
801                                      cqs_[0].get());
802   TestAllMethods();
803   generic_handler_thread.join();
804 }
805 
TEST_P(HybridEnd2endTest,CallbackGenericEcho)806 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
807   EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
808   class GenericEchoService : public CallbackGenericService {
809    private:
810     ServerGenericBidiReactor* CreateReactor(
811         GenericCallbackServerContext* context) override {
812       EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
813       gpr_log(GPR_DEBUG, "Constructor of generic service %d",
814               static_cast<int>(context->deadline().time_since_epoch().count()));
815 
816       class Reactor : public ServerGenericBidiReactor {
817        public:
818         Reactor() { StartRead(&request_); }
819 
820        private:
821         void OnDone() override { delete this; }
822         void OnReadDone(bool ok) override {
823           if (!ok) {
824             EXPECT_EQ(reads_complete_, 1);
825           } else {
826             EXPECT_EQ(reads_complete_++, 0);
827             response_ = request_;
828             StartWrite(&response_);
829             StartRead(&request_);
830           }
831         }
832         void OnWriteDone(bool ok) override {
833           Finish(ok ? Status::OK
834                     : Status(StatusCode::UNKNOWN, "Unexpected failure"));
835         }
836         ByteBuffer request_;
837         ByteBuffer response_;
838         std::atomic_int reads_complete_{0};
839       };
840       return new Reactor;
841     }
842   } generic_service;
843 
844   if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
845     return;
846   }
847   ResetStub();
848   TestAllMethods();
849 }
850 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)851 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
852   typedef EchoTestService::WithAsyncMethod_RequestStream<
853       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
854       SType;
855   SType service;
856   AsyncGenericService generic_service;
857   SetUpServer(&service, nullptr, &generic_service, nullptr);
858   ResetStub();
859   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
860                                      cqs_[0].get());
861   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
862                                             &service, cqs_[1].get());
863   TestAllMethods();
864   generic_handler_thread.join();
865   request_stream_handler_thread.join();
866 }
867 
868 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamSyncDupService)869 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamSyncDupService) {
870   typedef EchoTestService::WithAsyncMethod_RequestStream<
871       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
872       SType;
873   SType service;
874   AsyncGenericService generic_service;
875   TestServiceImplDupPkg dup_service;
876   SetUpServer(&service, &dup_service, &generic_service, nullptr);
877   ResetStub();
878   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
879                                      cqs_[0].get());
880   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
881                                             &service, cqs_[1].get());
882   TestAllMethods();
883   SendEchoToDupService();
884   generic_handler_thread.join();
885   request_stream_handler_thread.join();
886 }
887 
888 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamAsyncDupService)889 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamAsyncDupService) {
890   typedef EchoTestService::WithAsyncMethod_RequestStream<
891       EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
892       SType;
893   SType service;
894   AsyncGenericService generic_service;
895   duplicate::EchoTestService::AsyncService dup_service;
896   SetUpServer(&service, &dup_service, &generic_service, nullptr);
897   ResetStub();
898   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
899                                      cqs_[0].get());
900   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
901                                             &service, cqs_[1].get());
902   std::thread echo_handler_thread(
903       HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
904       cqs_[2].get(), true);
905   TestAllMethods();
906   SendEchoToDupService();
907   generic_handler_thread.join();
908   request_stream_handler_thread.join();
909   echo_handler_thread.join();
910 }
911 
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)912 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
913   typedef EchoTestService::WithAsyncMethod_RequestStream<
914       EchoTestService::WithGenericMethod_Echo<
915           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
916       SType;
917   SType service;
918   AsyncGenericService generic_service;
919   SetUpServer(&service, nullptr, &generic_service, nullptr);
920   ResetStub();
921   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
922                                      cqs_[0].get());
923   std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
924                                             &service, cqs_[1].get());
925   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
926                                              &service, cqs_[2].get());
927   TestAllMethods();
928   generic_handler_thread.join();
929   request_stream_handler_thread.join();
930   response_stream_handler_thread.join();
931 }
932 
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)933 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
934   typedef EchoTestService::WithGenericMethod_RequestStream<
935       EchoTestService::WithGenericMethod_Echo<
936           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
937       SType;
938   SType service;
939   AsyncGenericService generic_service;
940   SetUpServer(&service, nullptr, &generic_service, nullptr);
941   ResetStub();
942   std::thread generic_handler_thread(HandleGenericCall, &generic_service,
943                                      cqs_[0].get());
944   std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
945                                       cqs_[1].get());
946   std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
947                                              &service, cqs_[2].get());
948   TestAllMethods();
949   generic_handler_thread.join();
950   generic_handler_thread2.join();
951   response_stream_handler_thread.join();
952 }
953 
954 // If WithGenericMethod is called and no generic service is registered, the
955 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)956 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
957   EchoTestService::WithGenericMethod_RequestStream<
958       EchoTestService::WithGenericMethod_Echo<
959           EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
960       service;
961   SetUpServer(&service, nullptr, nullptr, nullptr);
962   EXPECT_EQ(nullptr, server_.get());
963 }
964 
965 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
966                          ::testing::Bool());
967 
968 }  // namespace
969 }  // namespace testing
970 }  // namespace grpc
971 
main(int argc,char ** argv)972 int main(int argc, char** argv) {
973   grpc::testing::TestEnvironment env(&argc, argv);
974   ::testing::InitGoogleTest(&argc, argv);
975   return RUN_ALL_TESTS();
976 }
977