1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <memory>
20 #include <thread>
21
22 #include <gtest/gtest.h>
23
24 #include <grpc/grpc.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/generic/async_generic_service.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32
33 #include "src/core/lib/gprpp/env.h"
34 #include "src/core/lib/iomgr/iomgr.h"
35 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41
42 namespace grpc {
43 namespace testing {
44 namespace {
45
tag(int i)46 void* tag(int i) { return reinterpret_cast<void*>(i); }
47
VerifyReturnSuccess(CompletionQueue * cq,int i)48 bool VerifyReturnSuccess(CompletionQueue* cq, int i) {
49 void* got_tag;
50 bool ok;
51 EXPECT_TRUE(cq->Next(&got_tag, &ok));
52 EXPECT_EQ(tag(i), got_tag);
53 return ok;
54 }
55
Verify(CompletionQueue * cq,int i,bool expect_ok)56 void Verify(CompletionQueue* cq, int i, bool expect_ok) {
57 EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
58 }
59
60 // Handlers to handle async request at a server. To be run in a separate thread.
61 template <class Service>
HandleEcho(Service * service,ServerCompletionQueue * cq,bool dup_service)62 void HandleEcho(Service* service, ServerCompletionQueue* cq, bool dup_service) {
63 ServerContext srv_ctx;
64 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
65 EchoRequest recv_request;
66 EchoResponse send_response;
67 service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
68 tag(1));
69 Verify(cq, 1, true);
70 send_response.set_message(recv_request.message());
71 if (dup_service) {
72 send_response.mutable_message()->append("_dup");
73 }
74 response_writer.Finish(send_response, Status::OK, tag(2));
75 Verify(cq, 2, true);
76 }
77
78 // Handlers to handle raw request at a server. To be run in a
79 // separate thread. Note that this is the same as the async version, except
80 // that the req/resp are ByteBuffers
81 template <class Service>
HandleRawEcho(Service * service,ServerCompletionQueue * cq,bool)82 void HandleRawEcho(Service* service, ServerCompletionQueue* cq,
83 bool /*dup_service*/) {
84 ServerContext srv_ctx;
85 GenericServerAsyncResponseWriter response_writer(&srv_ctx);
86 ByteBuffer recv_buffer;
87 service->RequestEcho(&srv_ctx, &recv_buffer, &response_writer, cq, cq,
88 tag(1));
89 Verify(cq, 1, true);
90 EchoRequest recv_request;
91 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
92 EchoResponse send_response;
93 send_response.set_message(recv_request.message());
94 auto send_buffer = SerializeToByteBuffer(&send_response);
95 response_writer.Finish(*send_buffer, Status::OK, tag(2));
96 Verify(cq, 2, true);
97 }
98
99 template <class Service>
HandleClientStreaming(Service * service,ServerCompletionQueue * cq)100 void HandleClientStreaming(Service* service, ServerCompletionQueue* cq) {
101 ServerContext srv_ctx;
102 EchoRequest recv_request;
103 EchoResponse send_response;
104 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
105 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
106 Verify(cq, 1, true);
107 int i = 1;
108 do {
109 i++;
110 send_response.mutable_message()->append(recv_request.message());
111 srv_stream.Read(&recv_request, tag(i));
112 } while (VerifyReturnSuccess(cq, i));
113 srv_stream.Finish(send_response, Status::OK, tag(100));
114 Verify(cq, 100, true);
115 }
116
117 template <class Service>
HandleRawClientStreaming(Service * service,ServerCompletionQueue * cq)118 void HandleRawClientStreaming(Service* service, ServerCompletionQueue* cq) {
119 ServerContext srv_ctx;
120 ByteBuffer recv_buffer;
121 EchoRequest recv_request;
122 EchoResponse send_response;
123 GenericServerAsyncReader srv_stream(&srv_ctx);
124 service->RequestRequestStream(&srv_ctx, &srv_stream, cq, cq, tag(1));
125 Verify(cq, 1, true);
126 int i = 1;
127 while (true) {
128 i++;
129 srv_stream.Read(&recv_buffer, tag(i));
130 if (!VerifyReturnSuccess(cq, i)) {
131 break;
132 }
133 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
134 send_response.mutable_message()->append(recv_request.message());
135 }
136 auto send_buffer = SerializeToByteBuffer(&send_response);
137 srv_stream.Finish(*send_buffer, Status::OK, tag(100));
138 Verify(cq, 100, true);
139 }
140
141 template <class Service>
HandleServerStreaming(Service * service,ServerCompletionQueue * cq)142 void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
143 ServerContext srv_ctx;
144 EchoRequest recv_request;
145 EchoResponse send_response;
146 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
147 service->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq, cq,
148 tag(1));
149 Verify(cq, 1, true);
150 send_response.set_message(recv_request.message() + "0");
151 srv_stream.Write(send_response, tag(2));
152 Verify(cq, 2, true);
153 send_response.set_message(recv_request.message() + "1");
154 srv_stream.Write(send_response, tag(3));
155 Verify(cq, 3, true);
156 send_response.set_message(recv_request.message() + "2");
157 srv_stream.Write(send_response, tag(4));
158 Verify(cq, 4, true);
159 srv_stream.Finish(Status::OK, tag(5));
160 Verify(cq, 5, true);
161 }
162
HandleGenericEcho(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)163 void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
164 CompletionQueue* cq) {
165 ByteBuffer recv_buffer;
166 stream->Read(&recv_buffer, tag(2));
167 Verify(cq, 2, true);
168 EchoRequest recv_request;
169 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
170 EchoResponse send_response;
171 send_response.set_message(recv_request.message());
172 auto send_buffer = SerializeToByteBuffer(&send_response);
173 stream->Write(*send_buffer, tag(3));
174 Verify(cq, 3, true);
175 stream->Finish(Status::OK, tag(4));
176 Verify(cq, 4, true);
177 }
178
HandleGenericRequestStream(GenericServerAsyncReaderWriter * stream,CompletionQueue * cq)179 void HandleGenericRequestStream(GenericServerAsyncReaderWriter* stream,
180 CompletionQueue* cq) {
181 ByteBuffer recv_buffer;
182 EchoRequest recv_request;
183 EchoResponse send_response;
184 int i = 1;
185 while (true) {
186 i++;
187 stream->Read(&recv_buffer, tag(i));
188 if (!VerifyReturnSuccess(cq, i)) {
189 break;
190 }
191 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
192 send_response.mutable_message()->append(recv_request.message());
193 }
194 auto send_buffer = SerializeToByteBuffer(&send_response);
195 stream->Write(*send_buffer, tag(99));
196 Verify(cq, 99, true);
197 stream->Finish(Status::OK, tag(100));
198 Verify(cq, 100, true);
199 }
200
201 // Request and handle one generic call.
HandleGenericCall(AsyncGenericService * service,ServerCompletionQueue * cq)202 void HandleGenericCall(AsyncGenericService* service,
203 ServerCompletionQueue* cq) {
204 GenericServerContext srv_ctx;
205 GenericServerAsyncReaderWriter stream(&srv_ctx);
206 service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
207 Verify(cq, 1, true);
208 if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
209 HandleGenericEcho(&stream, cq);
210 } else if (srv_ctx.method() ==
211 "/grpc.testing.EchoTestService/RequestStream") {
212 HandleGenericRequestStream(&stream, cq);
213 } else { // other methods not handled yet.
214 gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
215 GPR_ASSERT(0);
216 }
217 }
218
219 class TestServiceImplDupPkg
220 : public grpc::testing::duplicate::EchoTestService::Service {
221 public:
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)222 Status Echo(ServerContext* /*context*/, const EchoRequest* request,
223 EchoResponse* response) override {
224 response->set_message(request->message() + "_dup");
225 return Status::OK;
226 }
227 };
228
229 class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
230 protected:
HybridEnd2endTest()231 HybridEnd2endTest() {}
232
SetUpTestSuite()233 static void SetUpTestSuite() {
234 #if TARGET_OS_IPHONE
235 // Workaround Apple CFStream bug
236 grpc_core::SetEnv("grpc_cfstream", "0");
237 #endif
238 }
239
SetUp()240 void SetUp() override {
241 inproc_ = (::testing::UnitTest::GetInstance()
242 ->current_test_info()
243 ->value_param() != nullptr)
244 ? GetParam()
245 : false;
246 }
247
SetUpServer(grpc::Service * service1,grpc::Service * service2,AsyncGenericService * generic_service,CallbackGenericService * callback_generic_service,int max_message_size=0)248 bool SetUpServer(grpc::Service* service1, grpc::Service* service2,
249 AsyncGenericService* generic_service,
250 CallbackGenericService* callback_generic_service,
251 int max_message_size = 0) {
252 int port = grpc_pick_unused_port_or_die();
253 server_address_ << "localhost:" << port;
254
255 // Setup server
256 ServerBuilder builder;
257 builder.AddListeningPort(server_address_.str(),
258 grpc::InsecureServerCredentials());
259 // Always add a sync unimplemented service: we rely on having at least one
260 // synchronous method to get a listening cq
261 builder.RegisterService(&unimplemented_service_);
262 builder.RegisterService(service1);
263 if (service2) {
264 builder.RegisterService(service2);
265 }
266 if (generic_service) {
267 builder.RegisterAsyncGenericService(generic_service);
268 }
269 if (callback_generic_service) {
270 builder.RegisterCallbackGenericService(callback_generic_service);
271 }
272
273 if (max_message_size != 0) {
274 builder.SetMaxMessageSize(max_message_size);
275 }
276
277 // Create a separate cq for each potential handler.
278 for (int i = 0; i < 5; i++) {
279 cqs_.push_back(builder.AddCompletionQueue(false));
280 }
281 server_ = builder.BuildAndStart();
282
283 // If there is a generic callback service, this setup is only successful if
284 // we have an iomgr that can run in the background or are inprocess
285 return !callback_generic_service || grpc_iomgr_run_in_background() ||
286 inproc_;
287 }
288
TearDown()289 void TearDown() override {
290 if (server_) {
291 server_->Shutdown();
292 }
293 void* ignored_tag;
294 bool ignored_ok;
295 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
296 (*it)->Shutdown();
297 while ((*it)->Next(&ignored_tag, &ignored_ok)) {
298 }
299 }
300 }
301
ResetStub()302 void ResetStub() {
303 std::shared_ptr<Channel> channel =
304 inproc_ ? server_->InProcessChannel(ChannelArguments())
305 : grpc::CreateChannel(server_address_.str(),
306 InsecureChannelCredentials());
307 stub_ = grpc::testing::EchoTestService::NewStub(channel);
308 }
309
310 // Test all rpc methods.
TestAllMethods()311 void TestAllMethods() {
312 SendEcho();
313 SendSimpleClientStreaming();
314 SendSimpleServerStreaming();
315 SendBidiStreaming();
316 }
317
SendEcho()318 void SendEcho() {
319 EchoRequest send_request;
320 EchoResponse recv_response;
321 ClientContext cli_ctx;
322 cli_ctx.set_wait_for_ready(true);
323 send_request.set_message("Hello");
324 Status recv_status = stub_->Echo(&cli_ctx, send_request, &recv_response);
325 EXPECT_EQ(send_request.message(), recv_response.message());
326 EXPECT_TRUE(recv_status.ok());
327 }
328
SendEchoToDupService()329 void SendEchoToDupService() {
330 std::shared_ptr<Channel> channel = grpc::CreateChannel(
331 server_address_.str(), InsecureChannelCredentials());
332 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
333 EchoRequest send_request;
334 EchoResponse recv_response;
335 ClientContext cli_ctx;
336 cli_ctx.set_wait_for_ready(true);
337 send_request.set_message("Hello");
338 Status recv_status = stub->Echo(&cli_ctx, send_request, &recv_response);
339 EXPECT_EQ(send_request.message() + "_dup", recv_response.message());
340 EXPECT_TRUE(recv_status.ok());
341 }
342
SendSimpleClientStreaming()343 void SendSimpleClientStreaming() {
344 EchoRequest send_request;
345 EchoResponse recv_response;
346 std::string expected_message;
347 ClientContext cli_ctx;
348 cli_ctx.set_wait_for_ready(true);
349 send_request.set_message("Hello");
350 auto stream = stub_->RequestStream(&cli_ctx, &recv_response);
351 for (int i = 0; i < 5; i++) {
352 EXPECT_TRUE(stream->Write(send_request));
353 expected_message.append(send_request.message());
354 }
355 stream->WritesDone();
356 Status recv_status = stream->Finish();
357 EXPECT_EQ(expected_message, recv_response.message());
358 EXPECT_TRUE(recv_status.ok());
359 }
360
SendSimpleServerStreaming()361 void SendSimpleServerStreaming() {
362 EchoRequest request;
363 EchoResponse response;
364 ClientContext context;
365 context.set_wait_for_ready(true);
366 request.set_message("hello");
367
368 auto stream = stub_->ResponseStream(&context, request);
369 EXPECT_TRUE(stream->Read(&response));
370 EXPECT_EQ(response.message(), request.message() + "0");
371 EXPECT_TRUE(stream->Read(&response));
372 EXPECT_EQ(response.message(), request.message() + "1");
373 EXPECT_TRUE(stream->Read(&response));
374 EXPECT_EQ(response.message(), request.message() + "2");
375 EXPECT_FALSE(stream->Read(&response));
376
377 Status s = stream->Finish();
378 EXPECT_TRUE(s.ok());
379 }
380
SendSimpleServerStreamingToDupService()381 void SendSimpleServerStreamingToDupService() {
382 std::shared_ptr<Channel> channel = grpc::CreateChannel(
383 server_address_.str(), InsecureChannelCredentials());
384 auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel);
385 EchoRequest request;
386 EchoResponse response;
387 ClientContext context;
388 context.set_wait_for_ready(true);
389 request.set_message("hello");
390
391 auto stream = stub->ResponseStream(&context, request);
392 EXPECT_TRUE(stream->Read(&response));
393 EXPECT_EQ(response.message(), request.message() + "0_dup");
394 EXPECT_TRUE(stream->Read(&response));
395 EXPECT_EQ(response.message(), request.message() + "1_dup");
396 EXPECT_TRUE(stream->Read(&response));
397 EXPECT_EQ(response.message(), request.message() + "2_dup");
398 EXPECT_FALSE(stream->Read(&response));
399
400 Status s = stream->Finish();
401 EXPECT_TRUE(s.ok());
402 }
403
SendBidiStreaming()404 void SendBidiStreaming() {
405 EchoRequest request;
406 EchoResponse response;
407 ClientContext context;
408 context.set_wait_for_ready(true);
409 std::string msg("hello");
410
411 auto stream = stub_->BidiStream(&context);
412
413 request.set_message(msg + "0");
414 EXPECT_TRUE(stream->Write(request));
415 EXPECT_TRUE(stream->Read(&response));
416 EXPECT_EQ(response.message(), request.message());
417
418 request.set_message(msg + "1");
419 EXPECT_TRUE(stream->Write(request));
420 EXPECT_TRUE(stream->Read(&response));
421 EXPECT_EQ(response.message(), request.message());
422
423 request.set_message(msg + "2");
424 EXPECT_TRUE(stream->Write(request));
425 EXPECT_TRUE(stream->Read(&response));
426 EXPECT_EQ(response.message(), request.message());
427
428 stream->WritesDone();
429 EXPECT_FALSE(stream->Read(&response));
430 EXPECT_FALSE(stream->Read(&response));
431
432 Status s = stream->Finish();
433 EXPECT_TRUE(s.ok());
434 }
435
436 grpc::testing::UnimplementedEchoService::Service unimplemented_service_;
437 std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
438 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
439 std::unique_ptr<Server> server_;
440 std::ostringstream server_address_;
441 bool inproc_;
442 };
443
TEST_F(HybridEnd2endTest,AsyncEcho)444 TEST_F(HybridEnd2endTest, AsyncEcho) {
445 typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
446 SType service;
447 SetUpServer(&service, nullptr, nullptr, nullptr);
448 ResetStub();
449 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
450 false);
451 TestAllMethods();
452 echo_handler_thread.join();
453 }
454
TEST_F(HybridEnd2endTest,RawEcho)455 TEST_F(HybridEnd2endTest, RawEcho) {
456 typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
457 SType service;
458 SetUpServer(&service, nullptr, nullptr, nullptr);
459 ResetStub();
460 std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
461 false);
462 TestAllMethods();
463 echo_handler_thread.join();
464 }
465
TEST_F(HybridEnd2endTest,RawRequestStream)466 TEST_F(HybridEnd2endTest, RawRequestStream) {
467 typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
468 SType service;
469 SetUpServer(&service, nullptr, nullptr, nullptr);
470 ResetStub();
471 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
472 &service, cqs_[0].get());
473 TestAllMethods();
474 request_stream_handler_thread.join();
475 }
476
TEST_F(HybridEnd2endTest,AsyncEchoRawRequestStream)477 TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
478 typedef EchoTestService::WithRawMethod_RequestStream<
479 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
480 SType;
481 SType service;
482 SetUpServer(&service, nullptr, nullptr, nullptr);
483 ResetStub();
484 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
485 false);
486 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
487 &service, cqs_[1].get());
488 TestAllMethods();
489 request_stream_handler_thread.join();
490 echo_handler_thread.join();
491 }
492
TEST_F(HybridEnd2endTest,GenericEchoRawRequestStream)493 TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
494 typedef EchoTestService::WithRawMethod_RequestStream<
495 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
496 SType;
497 SType service;
498 AsyncGenericService generic_service;
499 SetUpServer(&service, nullptr, &generic_service, nullptr);
500 ResetStub();
501 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
502 cqs_[0].get());
503 std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
504 &service, cqs_[1].get());
505 TestAllMethods();
506 generic_handler_thread.join();
507 request_stream_handler_thread.join();
508 }
509
TEST_F(HybridEnd2endTest,AsyncEchoRequestStream)510 TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
511 typedef EchoTestService::WithAsyncMethod_RequestStream<
512 EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
513 SType;
514 SType service;
515 SetUpServer(&service, nullptr, nullptr, nullptr);
516 ResetStub();
517 std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
518 false);
519 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
520 &service, cqs_[1].get());
521 TestAllMethods();
522 echo_handler_thread.join();
523 request_stream_handler_thread.join();
524 }
525
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStream)526 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
527 typedef EchoTestService::WithAsyncMethod_RequestStream<
528 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
529 SType;
530 SType service;
531 SetUpServer(&service, nullptr, nullptr, nullptr);
532 ResetStub();
533 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
534 &service, cqs_[0].get());
535 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
536 &service, cqs_[1].get());
537 TestAllMethods();
538 response_stream_handler_thread.join();
539 request_stream_handler_thread.join();
540 }
541
542 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncDupService)543 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamSyncDupService) {
544 typedef EchoTestService::WithAsyncMethod_RequestStream<
545 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
546 SType;
547 SType service;
548 TestServiceImplDupPkg dup_service;
549 SetUpServer(&service, &dup_service, nullptr, nullptr);
550 ResetStub();
551 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
552 &service, cqs_[0].get());
553 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
554 &service, cqs_[1].get());
555 TestAllMethods();
556 SendEchoToDupService();
557 response_stream_handler_thread.join();
558 request_stream_handler_thread.join();
559 }
560
561 // Add a second service with one sync streamed unary method.
562 class StreamedUnaryDupPkg
563 : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo<
564 TestServiceImplDupPkg> {
565 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)566 Status StreamedEcho(
567 ServerContext* /*context*/,
568 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
569 EchoRequest req;
570 EchoResponse resp;
571 uint32_t next_msg_sz;
572 stream->NextMessageSize(&next_msg_sz);
573 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
574 GPR_ASSERT(stream->Read(&req));
575 resp.set_message(req.message() + "_dup");
576 GPR_ASSERT(stream->Write(resp));
577 return Status::OK;
578 }
579 };
580
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService)581 TEST_F(HybridEnd2endTest,
582 AsyncRequestStreamResponseStreamSyncStreamedUnaryDupService) {
583 typedef EchoTestService::WithAsyncMethod_RequestStream<
584 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
585 SType;
586 SType service;
587 StreamedUnaryDupPkg dup_service;
588 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
589 ResetStub();
590 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
591 &service, cqs_[0].get());
592 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
593 &service, cqs_[1].get());
594 TestAllMethods();
595 SendEchoToDupService();
596 response_stream_handler_thread.join();
597 request_stream_handler_thread.join();
598 }
599
600 // Add a second service that is fully Streamed Unary
601 class FullyStreamedUnaryDupPkg
602 : public duplicate::EchoTestService::StreamedUnaryService {
603 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)604 Status StreamedEcho(
605 ServerContext* /*context*/,
606 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
607 EchoRequest req;
608 EchoResponse resp;
609 uint32_t next_msg_sz;
610 stream->NextMessageSize(&next_msg_sz);
611 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
612 GPR_ASSERT(stream->Read(&req));
613 resp.set_message(req.message() + "_dup");
614 GPR_ASSERT(stream->Write(resp));
615 return Status::OK;
616 }
617 };
618
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService)619 TEST_F(HybridEnd2endTest,
620 AsyncRequestStreamResponseStreamSyncFullyStreamedUnaryDupService) {
621 typedef EchoTestService::WithAsyncMethod_RequestStream<
622 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
623 SType;
624 SType service;
625 FullyStreamedUnaryDupPkg dup_service;
626 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
627 ResetStub();
628 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
629 &service, cqs_[0].get());
630 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
631 &service, cqs_[1].get());
632 TestAllMethods();
633 SendEchoToDupService();
634 response_stream_handler_thread.join();
635 request_stream_handler_thread.join();
636 }
637
638 // Add a second service with one sync split server streaming method.
639 class SplitResponseStreamDupPkg
640 : public duplicate::EchoTestService::
641 WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> {
642 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)643 Status StreamedResponseStream(
644 ServerContext* /*context*/,
645 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
646 EchoRequest req;
647 EchoResponse resp;
648 uint32_t next_msg_sz;
649 stream->NextMessageSize(&next_msg_sz);
650 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
651 GPR_ASSERT(stream->Read(&req));
652 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
653 resp.set_message(req.message() + std::to_string(i) + "_dup");
654 GPR_ASSERT(stream->Write(resp));
655 }
656 return Status::OK;
657 }
658 };
659
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamSyncSplitStreamedDupService)660 TEST_F(HybridEnd2endTest,
661 AsyncRequestStreamResponseStreamSyncSplitStreamedDupService) {
662 typedef EchoTestService::WithAsyncMethod_RequestStream<
663 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
664 SType;
665 SType service;
666 SplitResponseStreamDupPkg dup_service;
667 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
668 ResetStub();
669 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
670 &service, cqs_[0].get());
671 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
672 &service, cqs_[1].get());
673 TestAllMethods();
674 SendSimpleServerStreamingToDupService();
675 response_stream_handler_thread.join();
676 request_stream_handler_thread.join();
677 }
678
679 // Add a second service that is fully split server streamed
680 class FullySplitStreamedDupPkg
681 : public duplicate::EchoTestService::SplitStreamedService {
682 public:
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)683 Status StreamedResponseStream(
684 ServerContext* /*context*/,
685 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
686 EchoRequest req;
687 EchoResponse resp;
688 uint32_t next_msg_sz;
689 stream->NextMessageSize(&next_msg_sz);
690 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
691 GPR_ASSERT(stream->Read(&req));
692 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
693 resp.set_message(req.message() + std::to_string(i) + "_dup");
694 GPR_ASSERT(stream->Write(resp));
695 }
696 return Status::OK;
697 }
698 };
699
TEST_F(HybridEnd2endTest,asyncRequestStreamResponseStreamFullySplitStreamedDupService)700 TEST_F(HybridEnd2endTest,
701 asyncRequestStreamResponseStreamFullySplitStreamedDupService) {
702 typedef EchoTestService::WithAsyncMethod_RequestStream<
703 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
704 SType;
705 SType service;
706 FullySplitStreamedDupPkg dup_service;
707 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
708 ResetStub();
709 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
710 &service, cqs_[0].get());
711 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
712 &service, cqs_[1].get());
713 TestAllMethods();
714 SendSimpleServerStreamingToDupService();
715 response_stream_handler_thread.join();
716 request_stream_handler_thread.join();
717 }
718
719 // Add a second service that is fully server streamed
720 class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService {
721 public:
StreamedEcho(ServerContext *,ServerUnaryStreamer<EchoRequest,EchoResponse> * stream)722 Status StreamedEcho(
723 ServerContext* /*context*/,
724 ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) override {
725 EchoRequest req;
726 EchoResponse resp;
727 uint32_t next_msg_sz;
728 stream->NextMessageSize(&next_msg_sz);
729 gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz);
730 GPR_ASSERT(stream->Read(&req));
731 resp.set_message(req.message() + "_dup");
732 GPR_ASSERT(stream->Write(resp));
733 return Status::OK;
734 }
StreamedResponseStream(ServerContext *,ServerSplitStreamer<EchoRequest,EchoResponse> * stream)735 Status StreamedResponseStream(
736 ServerContext* /*context*/,
737 ServerSplitStreamer<EchoRequest, EchoResponse>* stream) override {
738 EchoRequest req;
739 EchoResponse resp;
740 uint32_t next_msg_sz;
741 stream->NextMessageSize(&next_msg_sz);
742 gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz);
743 GPR_ASSERT(stream->Read(&req));
744 for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) {
745 resp.set_message(req.message() + std::to_string(i) + "_dup");
746 GPR_ASSERT(stream->Write(resp));
747 }
748 return Status::OK;
749 }
750 };
751
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamFullyStreamedDupService)752 TEST_F(HybridEnd2endTest,
753 AsyncRequestStreamResponseStreamFullyStreamedDupService) {
754 typedef EchoTestService::WithAsyncMethod_RequestStream<
755 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
756 SType;
757 SType service;
758 FullyStreamedDupPkg dup_service;
759 SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
760 ResetStub();
761 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
762 &service, cqs_[0].get());
763 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
764 &service, cqs_[1].get());
765 TestAllMethods();
766 SendEchoToDupService();
767 SendSimpleServerStreamingToDupService();
768 response_stream_handler_thread.join();
769 request_stream_handler_thread.join();
770 }
771
772 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,AsyncRequestStreamResponseStreamAsyncDupService)773 TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStreamAsyncDupService) {
774 typedef EchoTestService::WithAsyncMethod_RequestStream<
775 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
776 SType;
777 SType service;
778 duplicate::EchoTestService::AsyncService dup_service;
779 SetUpServer(&service, &dup_service, nullptr, nullptr);
780 ResetStub();
781 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
782 &service, cqs_[0].get());
783 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
784 &service, cqs_[1].get());
785 std::thread echo_handler_thread(
786 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
787 cqs_[2].get(), true);
788 TestAllMethods();
789 SendEchoToDupService();
790 response_stream_handler_thread.join();
791 request_stream_handler_thread.join();
792 echo_handler_thread.join();
793 }
794
TEST_F(HybridEnd2endTest,GenericEcho)795 TEST_F(HybridEnd2endTest, GenericEcho) {
796 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
797 AsyncGenericService generic_service;
798 SetUpServer(&service, nullptr, &generic_service, nullptr);
799 ResetStub();
800 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
801 cqs_[0].get());
802 TestAllMethods();
803 generic_handler_thread.join();
804 }
805
TEST_P(HybridEnd2endTest,CallbackGenericEcho)806 TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
807 EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
808 class GenericEchoService : public CallbackGenericService {
809 private:
810 ServerGenericBidiReactor* CreateReactor(
811 GenericCallbackServerContext* context) override {
812 EXPECT_EQ(context->method(), "/grpc.testing.EchoTestService/Echo");
813 gpr_log(GPR_DEBUG, "Constructor of generic service %d",
814 static_cast<int>(context->deadline().time_since_epoch().count()));
815
816 class Reactor : public ServerGenericBidiReactor {
817 public:
818 Reactor() { StartRead(&request_); }
819
820 private:
821 void OnDone() override { delete this; }
822 void OnReadDone(bool ok) override {
823 if (!ok) {
824 EXPECT_EQ(reads_complete_, 1);
825 } else {
826 EXPECT_EQ(reads_complete_++, 0);
827 response_ = request_;
828 StartWrite(&response_);
829 StartRead(&request_);
830 }
831 }
832 void OnWriteDone(bool ok) override {
833 Finish(ok ? Status::OK
834 : Status(StatusCode::UNKNOWN, "Unexpected failure"));
835 }
836 ByteBuffer request_;
837 ByteBuffer response_;
838 std::atomic_int reads_complete_{0};
839 };
840 return new Reactor;
841 }
842 } generic_service;
843
844 if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
845 return;
846 }
847 ResetStub();
848 TestAllMethods();
849 }
850
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStream)851 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
852 typedef EchoTestService::WithAsyncMethod_RequestStream<
853 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
854 SType;
855 SType service;
856 AsyncGenericService generic_service;
857 SetUpServer(&service, nullptr, &generic_service, nullptr);
858 ResetStub();
859 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
860 cqs_[0].get());
861 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
862 &service, cqs_[1].get());
863 TestAllMethods();
864 generic_handler_thread.join();
865 request_stream_handler_thread.join();
866 }
867
868 // Add a second service with one sync method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamSyncDupService)869 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamSyncDupService) {
870 typedef EchoTestService::WithAsyncMethod_RequestStream<
871 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
872 SType;
873 SType service;
874 AsyncGenericService generic_service;
875 TestServiceImplDupPkg dup_service;
876 SetUpServer(&service, &dup_service, &generic_service, nullptr);
877 ResetStub();
878 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
879 cqs_[0].get());
880 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
881 &service, cqs_[1].get());
882 TestAllMethods();
883 SendEchoToDupService();
884 generic_handler_thread.join();
885 request_stream_handler_thread.join();
886 }
887
888 // Add a second service with one async method.
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamAsyncDupService)889 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamAsyncDupService) {
890 typedef EchoTestService::WithAsyncMethod_RequestStream<
891 EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
892 SType;
893 SType service;
894 AsyncGenericService generic_service;
895 duplicate::EchoTestService::AsyncService dup_service;
896 SetUpServer(&service, &dup_service, &generic_service, nullptr);
897 ResetStub();
898 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
899 cqs_[0].get());
900 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
901 &service, cqs_[1].get());
902 std::thread echo_handler_thread(
903 HandleEcho<duplicate::EchoTestService::AsyncService>, &dup_service,
904 cqs_[2].get(), true);
905 TestAllMethods();
906 SendEchoToDupService();
907 generic_handler_thread.join();
908 request_stream_handler_thread.join();
909 echo_handler_thread.join();
910 }
911
TEST_F(HybridEnd2endTest,GenericEchoAsyncRequestStreamResponseStream)912 TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
913 typedef EchoTestService::WithAsyncMethod_RequestStream<
914 EchoTestService::WithGenericMethod_Echo<
915 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
916 SType;
917 SType service;
918 AsyncGenericService generic_service;
919 SetUpServer(&service, nullptr, &generic_service, nullptr);
920 ResetStub();
921 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
922 cqs_[0].get());
923 std::thread request_stream_handler_thread(HandleClientStreaming<SType>,
924 &service, cqs_[1].get());
925 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
926 &service, cqs_[2].get());
927 TestAllMethods();
928 generic_handler_thread.join();
929 request_stream_handler_thread.join();
930 response_stream_handler_thread.join();
931 }
932
TEST_F(HybridEnd2endTest,GenericEchoRequestStreamAsyncResponseStream)933 TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
934 typedef EchoTestService::WithGenericMethod_RequestStream<
935 EchoTestService::WithGenericMethod_Echo<
936 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
937 SType;
938 SType service;
939 AsyncGenericService generic_service;
940 SetUpServer(&service, nullptr, &generic_service, nullptr);
941 ResetStub();
942 std::thread generic_handler_thread(HandleGenericCall, &generic_service,
943 cqs_[0].get());
944 std::thread generic_handler_thread2(HandleGenericCall, &generic_service,
945 cqs_[1].get());
946 std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
947 &service, cqs_[2].get());
948 TestAllMethods();
949 generic_handler_thread.join();
950 generic_handler_thread2.join();
951 response_stream_handler_thread.join();
952 }
953
954 // If WithGenericMethod is called and no generic service is registered, the
955 // server will fail to build.
TEST_F(HybridEnd2endTest,GenericMethodWithoutGenericService)956 TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
957 EchoTestService::WithGenericMethod_RequestStream<
958 EchoTestService::WithGenericMethod_Echo<
959 EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
960 service;
961 SetUpServer(&service, nullptr, nullptr, nullptr);
962 EXPECT_EQ(nullptr, server_.get());
963 }
964
965 INSTANTIATE_TEST_SUITE_P(HybridEnd2endTest, HybridEnd2endTest,
966 ::testing::Bool());
967
968 } // namespace
969 } // namespace testing
970 } // namespace grpc
971
main(int argc,char ** argv)972 int main(int argc, char** argv) {
973 grpc::testing::TestEnvironment env(&argc, argv);
974 ::testing::InitGoogleTest(&argc, argv);
975 return RUN_ALL_TESTS();
976 }
977