1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include <gtest/gtest.h>
24
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/core/lib/gprpp/env.h"
38 #include "src/core/lib/iomgr/port.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/byte_buffer_proto_helper.h"
44 #include "test/cpp/util/string_ref_helper.h"
45
46 using grpc::testing::EchoRequest;
47 using grpc::testing::EchoResponse;
48
49 namespace grpc {
50 namespace testing {
51
52 namespace {
53
tag(int i)54 void* tag(int i) { return reinterpret_cast<void*>(i); }
detag(void * p)55 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
56
57 class Verifier {
58 public:
Verifier()59 Verifier() {}
60
61 // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)62 Verifier& Expect(int i, bool expect_ok) {
63 expectations_[tag(i)] = expect_ok;
64 return *this;
65 }
66
67 // Next waits for 1 async tag to complete, checks its
68 // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)69 int Next(CompletionQueue* cq, bool ignore_ok) {
70 bool ok;
71 void* got_tag;
72 EXPECT_TRUE(cq->Next(&got_tag, &ok));
73 GotTag(got_tag, ok, ignore_ok);
74 return detag(got_tag);
75 }
76
77 // Verify keeps calling Next until all currently set
78 // expected tags are complete
Verify(CompletionQueue * cq)79 void Verify(CompletionQueue* cq) {
80 GPR_ASSERT(!expectations_.empty());
81 while (!expectations_.empty()) {
82 Next(cq, false);
83 }
84 }
85
86 private:
GotTag(void * got_tag,bool ok,bool ignore_ok)87 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
88 auto it = expectations_.find(got_tag);
89 if (it != expectations_.end()) {
90 if (!ignore_ok) {
91 EXPECT_EQ(it->second, ok);
92 }
93 expectations_.erase(it);
94 }
95 }
96
97 std::map<void*, bool> expectations_;
98 };
99
100 class RawEnd2EndTest : public ::testing::Test {
101 protected:
RawEnd2EndTest()102 RawEnd2EndTest() {}
103
SetUp()104 void SetUp() override {
105 port_ = grpc_pick_unused_port_or_die();
106 server_address_ << "localhost:" << port_;
107 }
108
TearDown()109 void TearDown() override {
110 server_->Shutdown();
111 void* ignored_tag;
112 bool ignored_ok;
113 cq_->Shutdown();
114 while (cq_->Next(&ignored_tag, &ignored_ok)) {
115 }
116 stub_.reset();
117 grpc_recycle_unused_port(port_);
118 }
119
120 template <typename ServerType>
BuildAndStartServer()121 std::unique_ptr<ServerType> BuildAndStartServer() {
122 ServerBuilder builder;
123 builder.AddListeningPort(server_address_.str(),
124 grpc::InsecureServerCredentials());
125 std::unique_ptr<ServerType> service(new ServerType());
126 builder.RegisterService(service.get());
127 cq_ = builder.AddCompletionQueue();
128 server_ = builder.BuildAndStart();
129 return service;
130 }
131
ResetStub()132 void ResetStub() {
133 ChannelArguments args;
134 std::shared_ptr<Channel> channel = grpc::CreateChannel(
135 server_address_.str(), grpc::InsecureChannelCredentials());
136 stub_ = grpc::testing::EchoTestService::NewStub(channel);
137 }
138
139 std::unique_ptr<ServerCompletionQueue> cq_;
140 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
141 std::unique_ptr<Server> server_;
142 std::ostringstream server_address_;
143 int port_;
144
145 // For the client application to populate and send to server.
146 EchoRequest send_request_;
147 grpc::ByteBuffer send_request_buffer_;
148
149 // For the server to give to gRPC to be populated by incoming request
150 // from client.
151 EchoRequest recv_request_;
152 grpc::ByteBuffer recv_request_buffer_;
153
154 // For the server application to populate and send back to client.
155 EchoResponse send_response_;
156 grpc::ByteBuffer send_response_buffer_;
157
158 // For the client to give to gRPC to be populated by incoming response
159 // from server.
160 EchoResponse recv_response_;
161 grpc::ByteBuffer recv_response_buffer_;
162 Status recv_status_;
163
164 // Both sides need contexts
165 ClientContext cli_ctx_;
166 ServerContext srv_ctx_;
167 };
168
169 // Regular Async, both peers use proto
TEST_F(RawEnd2EndTest,PureAsyncService)170 TEST_F(RawEnd2EndTest, PureAsyncService) {
171 typedef grpc::testing::EchoTestService::AsyncService SType;
172 ResetStub();
173 auto service = BuildAndStartServer<SType>();
174 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_);
175
176 send_request_.set_message("hello");
177 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
178 stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
179 service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(),
180 cq_.get(), tag(2));
181 response_reader->Finish(&recv_response_, &recv_status_, tag(4));
182 Verifier().Expect(2, true).Verify(cq_.get());
183 EXPECT_EQ(send_request_.message(), recv_request_.message());
184 send_response_.set_message(recv_request_.message());
185 response_writer.Finish(send_response_, Status::OK, tag(3));
186 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
187
188 EXPECT_EQ(send_response_.message(), recv_response_.message());
189 EXPECT_TRUE(recv_status_.ok());
190 }
191
192 // Client uses proto, server uses generic codegen, unary
TEST_F(RawEnd2EndTest,RawServerUnary)193 TEST_F(RawEnd2EndTest, RawServerUnary) {
194 typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
195 grpc::testing::EchoTestService::Service>
196 SType;
197 ResetStub();
198 auto service = BuildAndStartServer<SType>();
199 grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_);
200
201 send_request_.set_message("hello unary");
202 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
203 stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
204 service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer,
205 cq_.get(), cq_.get(), tag(2));
206 response_reader->Finish(&recv_response_, &recv_status_, tag(4));
207 Verifier().Expect(2, true).Verify(cq_.get());
208 EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_));
209 EXPECT_EQ(send_request_.message(), recv_request_.message());
210 send_response_.set_message(recv_request_.message());
211 EXPECT_TRUE(
212 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_));
213 response_writer.Finish(send_response_buffer_, Status::OK, tag(3));
214 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
215
216 EXPECT_EQ(send_response_.message(), recv_response_.message());
217 EXPECT_TRUE(recv_status_.ok());
218 }
219
220 // Client uses proto, server uses generic codegen, client streaming
TEST_F(RawEnd2EndTest,RawServerClientStreaming)221 TEST_F(RawEnd2EndTest, RawServerClientStreaming) {
222 typedef grpc::testing::EchoTestService::WithRawMethod_RequestStream<
223 grpc::testing::EchoTestService::Service>
224 SType;
225 ResetStub();
226 auto service = BuildAndStartServer<SType>();
227
228 grpc::GenericServerAsyncReader srv_stream(&srv_ctx_);
229
230 send_request_.set_message("hello client streaming");
231 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
232 stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1)));
233
234 service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
235 tag(2));
236
237 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
238
239 cli_stream->Write(send_request_, tag(3));
240 srv_stream.Read(&recv_request_buffer_, tag(4));
241 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
242 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
243 EXPECT_EQ(send_request_.message(), recv_request_.message());
244
245 cli_stream->Write(send_request_, tag(5));
246 srv_stream.Read(&recv_request_buffer_, tag(6));
247 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
248
249 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
250 EXPECT_EQ(send_request_.message(), recv_request_.message());
251 cli_stream->WritesDone(tag(7));
252 srv_stream.Read(&recv_request_buffer_, tag(8));
253 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
254
255 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
256 send_response_.set_message(recv_request_.message());
257 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
258 srv_stream.Finish(send_response_buffer_, Status::OK, tag(9));
259 cli_stream->Finish(&recv_status_, tag(10));
260 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
261
262 EXPECT_EQ(send_response_.message(), recv_response_.message());
263 EXPECT_TRUE(recv_status_.ok());
264 }
265
266 // Client uses proto, server uses generic codegen, server streaming
TEST_F(RawEnd2EndTest,RawServerServerStreaming)267 TEST_F(RawEnd2EndTest, RawServerServerStreaming) {
268 typedef grpc::testing::EchoTestService::WithRawMethod_ResponseStream<
269 grpc::testing::EchoTestService::Service>
270 SType;
271 ResetStub();
272 auto service = BuildAndStartServer<SType>();
273 grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_);
274
275 send_request_.set_message("hello server streaming");
276 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
277 stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1)));
278
279 service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream,
280 cq_.get(), cq_.get(), tag(2));
281
282 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
283 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
284 EXPECT_EQ(send_request_.message(), recv_request_.message());
285
286 send_response_.set_message(recv_request_.message());
287 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
288 srv_stream.Write(send_response_buffer_, tag(3));
289 cli_stream->Read(&recv_response_, tag(4));
290 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
291 EXPECT_EQ(send_response_.message(), recv_response_.message());
292
293 srv_stream.Write(send_response_buffer_, tag(5));
294 cli_stream->Read(&recv_response_, tag(6));
295 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
296 EXPECT_EQ(send_response_.message(), recv_response_.message());
297
298 srv_stream.Finish(Status::OK, tag(7));
299 cli_stream->Read(&recv_response_, tag(8));
300 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
301
302 cli_stream->Finish(&recv_status_, tag(9));
303 Verifier().Expect(9, true).Verify(cq_.get());
304
305 EXPECT_TRUE(recv_status_.ok());
306 }
307
308 // Client uses proto, server uses generic codegen, bidi streaming
TEST_F(RawEnd2EndTest,RawServerBidiStreaming)309 TEST_F(RawEnd2EndTest, RawServerBidiStreaming) {
310 typedef grpc::testing::EchoTestService::WithRawMethod_BidiStream<
311 grpc::testing::EchoTestService::Service>
312 SType;
313 ResetStub();
314 auto service = BuildAndStartServer<SType>();
315
316 grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_);
317
318 send_request_.set_message("hello bidi streaming");
319 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
320 cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1)));
321
322 service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
323 tag(2));
324
325 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
326
327 cli_stream->Write(send_request_, tag(3));
328 srv_stream.Read(&recv_request_buffer_, tag(4));
329 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
330 ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
331 EXPECT_EQ(send_request_.message(), recv_request_.message());
332
333 send_response_.set_message(recv_request_.message());
334 SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
335 srv_stream.Write(send_response_buffer_, tag(5));
336 cli_stream->Read(&recv_response_, tag(6));
337 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
338 EXPECT_EQ(send_response_.message(), recv_response_.message());
339
340 cli_stream->WritesDone(tag(7));
341 srv_stream.Read(&recv_request_buffer_, tag(8));
342 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
343
344 srv_stream.Finish(Status::OK, tag(9));
345 cli_stream->Finish(&recv_status_, tag(10));
346 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
347
348 EXPECT_TRUE(recv_status_.ok());
349 }
350
351 // Testing that this pattern compiles
TEST_F(RawEnd2EndTest,CompileTest)352 TEST_F(RawEnd2EndTest, CompileTest) {
353 typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
354 grpc::testing::EchoTestService::AsyncService>
355 SType;
356 ResetStub();
357 auto service = BuildAndStartServer<SType>();
358 }
359
360 } // namespace
361 } // namespace testing
362 } // namespace grpc
363
main(int argc,char ** argv)364 int main(int argc, char** argv) {
365 // Change the backup poll interval from 5s to 100ms to speed up the
366 // ReconnectChannel test
367 grpc::testing::TestEnvironment env(&argc, argv);
368 ::testing::InitGoogleTest(&argc, argv);
369 int ret = RUN_ALL_TESTS();
370 return ret;
371 }
372