xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/raw_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22 
23 #include <gtest/gtest.h>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/core/lib/gprpp/env.h"
38 #include "src/core/lib/iomgr/port.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/byte_buffer_proto_helper.h"
44 #include "test/cpp/util/string_ref_helper.h"
45 
46 using grpc::testing::EchoRequest;
47 using grpc::testing::EchoResponse;
48 
49 namespace grpc {
50 namespace testing {
51 
52 namespace {
53 
tag(int i)54 void* tag(int i) { return reinterpret_cast<void*>(i); }
detag(void * p)55 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
56 
57 class Verifier {
58  public:
Verifier()59   Verifier() {}
60 
61   // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok)62   Verifier& Expect(int i, bool expect_ok) {
63     expectations_[tag(i)] = expect_ok;
64     return *this;
65   }
66 
67   // Next waits for 1 async tag to complete, checks its
68   // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)69   int Next(CompletionQueue* cq, bool ignore_ok) {
70     bool ok;
71     void* got_tag;
72     EXPECT_TRUE(cq->Next(&got_tag, &ok));
73     GotTag(got_tag, ok, ignore_ok);
74     return detag(got_tag);
75   }
76 
77   // Verify keeps calling Next until all currently set
78   // expected tags are complete
Verify(CompletionQueue * cq)79   void Verify(CompletionQueue* cq) {
80     GPR_ASSERT(!expectations_.empty());
81     while (!expectations_.empty()) {
82       Next(cq, false);
83     }
84   }
85 
86  private:
GotTag(void * got_tag,bool ok,bool ignore_ok)87   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
88     auto it = expectations_.find(got_tag);
89     if (it != expectations_.end()) {
90       if (!ignore_ok) {
91         EXPECT_EQ(it->second, ok);
92       }
93       expectations_.erase(it);
94     }
95   }
96 
97   std::map<void*, bool> expectations_;
98 };
99 
100 class RawEnd2EndTest : public ::testing::Test {
101  protected:
RawEnd2EndTest()102   RawEnd2EndTest() {}
103 
SetUp()104   void SetUp() override {
105     port_ = grpc_pick_unused_port_or_die();
106     server_address_ << "localhost:" << port_;
107   }
108 
TearDown()109   void TearDown() override {
110     server_->Shutdown();
111     void* ignored_tag;
112     bool ignored_ok;
113     cq_->Shutdown();
114     while (cq_->Next(&ignored_tag, &ignored_ok)) {
115     }
116     stub_.reset();
117     grpc_recycle_unused_port(port_);
118   }
119 
120   template <typename ServerType>
BuildAndStartServer()121   std::unique_ptr<ServerType> BuildAndStartServer() {
122     ServerBuilder builder;
123     builder.AddListeningPort(server_address_.str(),
124                              grpc::InsecureServerCredentials());
125     std::unique_ptr<ServerType> service(new ServerType());
126     builder.RegisterService(service.get());
127     cq_ = builder.AddCompletionQueue();
128     server_ = builder.BuildAndStart();
129     return service;
130   }
131 
ResetStub()132   void ResetStub() {
133     ChannelArguments args;
134     std::shared_ptr<Channel> channel = grpc::CreateChannel(
135         server_address_.str(), grpc::InsecureChannelCredentials());
136     stub_ = grpc::testing::EchoTestService::NewStub(channel);
137   }
138 
139   std::unique_ptr<ServerCompletionQueue> cq_;
140   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
141   std::unique_ptr<Server> server_;
142   std::ostringstream server_address_;
143   int port_;
144 
145   // For the client application to populate and send to server.
146   EchoRequest send_request_;
147   grpc::ByteBuffer send_request_buffer_;
148 
149   // For the server to give to gRPC to be populated by incoming request
150   // from client.
151   EchoRequest recv_request_;
152   grpc::ByteBuffer recv_request_buffer_;
153 
154   // For the server application to populate and send back to client.
155   EchoResponse send_response_;
156   grpc::ByteBuffer send_response_buffer_;
157 
158   // For the client to give to gRPC to be populated by incoming response
159   // from server.
160   EchoResponse recv_response_;
161   grpc::ByteBuffer recv_response_buffer_;
162   Status recv_status_;
163 
164   // Both sides need contexts
165   ClientContext cli_ctx_;
166   ServerContext srv_ctx_;
167 };
168 
169 // Regular Async, both peers use proto
TEST_F(RawEnd2EndTest,PureAsyncService)170 TEST_F(RawEnd2EndTest, PureAsyncService) {
171   typedef grpc::testing::EchoTestService::AsyncService SType;
172   ResetStub();
173   auto service = BuildAndStartServer<SType>();
174   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_);
175 
176   send_request_.set_message("hello");
177   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
178       stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
179   service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(),
180                        cq_.get(), tag(2));
181   response_reader->Finish(&recv_response_, &recv_status_, tag(4));
182   Verifier().Expect(2, true).Verify(cq_.get());
183   EXPECT_EQ(send_request_.message(), recv_request_.message());
184   send_response_.set_message(recv_request_.message());
185   response_writer.Finish(send_response_, Status::OK, tag(3));
186   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
187 
188   EXPECT_EQ(send_response_.message(), recv_response_.message());
189   EXPECT_TRUE(recv_status_.ok());
190 }
191 
192 // Client uses proto, server uses generic codegen, unary
TEST_F(RawEnd2EndTest,RawServerUnary)193 TEST_F(RawEnd2EndTest, RawServerUnary) {
194   typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
195       grpc::testing::EchoTestService::Service>
196       SType;
197   ResetStub();
198   auto service = BuildAndStartServer<SType>();
199   grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_);
200 
201   send_request_.set_message("hello unary");
202   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
203       stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get()));
204   service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer,
205                        cq_.get(), cq_.get(), tag(2));
206   response_reader->Finish(&recv_response_, &recv_status_, tag(4));
207   Verifier().Expect(2, true).Verify(cq_.get());
208   EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_));
209   EXPECT_EQ(send_request_.message(), recv_request_.message());
210   send_response_.set_message(recv_request_.message());
211   EXPECT_TRUE(
212       SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_));
213   response_writer.Finish(send_response_buffer_, Status::OK, tag(3));
214   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
215 
216   EXPECT_EQ(send_response_.message(), recv_response_.message());
217   EXPECT_TRUE(recv_status_.ok());
218 }
219 
220 // Client uses proto, server uses generic codegen, client streaming
TEST_F(RawEnd2EndTest,RawServerClientStreaming)221 TEST_F(RawEnd2EndTest, RawServerClientStreaming) {
222   typedef grpc::testing::EchoTestService::WithRawMethod_RequestStream<
223       grpc::testing::EchoTestService::Service>
224       SType;
225   ResetStub();
226   auto service = BuildAndStartServer<SType>();
227 
228   grpc::GenericServerAsyncReader srv_stream(&srv_ctx_);
229 
230   send_request_.set_message("hello client streaming");
231   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
232       stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1)));
233 
234   service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
235                                 tag(2));
236 
237   Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
238 
239   cli_stream->Write(send_request_, tag(3));
240   srv_stream.Read(&recv_request_buffer_, tag(4));
241   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
242   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
243   EXPECT_EQ(send_request_.message(), recv_request_.message());
244 
245   cli_stream->Write(send_request_, tag(5));
246   srv_stream.Read(&recv_request_buffer_, tag(6));
247   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
248 
249   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
250   EXPECT_EQ(send_request_.message(), recv_request_.message());
251   cli_stream->WritesDone(tag(7));
252   srv_stream.Read(&recv_request_buffer_, tag(8));
253   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
254 
255   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
256   send_response_.set_message(recv_request_.message());
257   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
258   srv_stream.Finish(send_response_buffer_, Status::OK, tag(9));
259   cli_stream->Finish(&recv_status_, tag(10));
260   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
261 
262   EXPECT_EQ(send_response_.message(), recv_response_.message());
263   EXPECT_TRUE(recv_status_.ok());
264 }
265 
266 // Client uses proto, server uses generic codegen, server streaming
TEST_F(RawEnd2EndTest,RawServerServerStreaming)267 TEST_F(RawEnd2EndTest, RawServerServerStreaming) {
268   typedef grpc::testing::EchoTestService::WithRawMethod_ResponseStream<
269       grpc::testing::EchoTestService::Service>
270       SType;
271   ResetStub();
272   auto service = BuildAndStartServer<SType>();
273   grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_);
274 
275   send_request_.set_message("hello server streaming");
276   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
277       stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1)));
278 
279   service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream,
280                                  cq_.get(), cq_.get(), tag(2));
281 
282   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
283   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
284   EXPECT_EQ(send_request_.message(), recv_request_.message());
285 
286   send_response_.set_message(recv_request_.message());
287   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
288   srv_stream.Write(send_response_buffer_, tag(3));
289   cli_stream->Read(&recv_response_, tag(4));
290   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
291   EXPECT_EQ(send_response_.message(), recv_response_.message());
292 
293   srv_stream.Write(send_response_buffer_, tag(5));
294   cli_stream->Read(&recv_response_, tag(6));
295   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
296   EXPECT_EQ(send_response_.message(), recv_response_.message());
297 
298   srv_stream.Finish(Status::OK, tag(7));
299   cli_stream->Read(&recv_response_, tag(8));
300   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
301 
302   cli_stream->Finish(&recv_status_, tag(9));
303   Verifier().Expect(9, true).Verify(cq_.get());
304 
305   EXPECT_TRUE(recv_status_.ok());
306 }
307 
308 // Client uses proto, server uses generic codegen, bidi streaming
TEST_F(RawEnd2EndTest,RawServerBidiStreaming)309 TEST_F(RawEnd2EndTest, RawServerBidiStreaming) {
310   typedef grpc::testing::EchoTestService::WithRawMethod_BidiStream<
311       grpc::testing::EchoTestService::Service>
312       SType;
313   ResetStub();
314   auto service = BuildAndStartServer<SType>();
315 
316   grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_);
317 
318   send_request_.set_message("hello bidi streaming");
319   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
320       cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1)));
321 
322   service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(),
323                              tag(2));
324 
325   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
326 
327   cli_stream->Write(send_request_, tag(3));
328   srv_stream.Read(&recv_request_buffer_, tag(4));
329   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
330   ParseFromByteBuffer(&recv_request_buffer_, &recv_request_);
331   EXPECT_EQ(send_request_.message(), recv_request_.message());
332 
333   send_response_.set_message(recv_request_.message());
334   SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_);
335   srv_stream.Write(send_response_buffer_, tag(5));
336   cli_stream->Read(&recv_response_, tag(6));
337   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
338   EXPECT_EQ(send_response_.message(), recv_response_.message());
339 
340   cli_stream->WritesDone(tag(7));
341   srv_stream.Read(&recv_request_buffer_, tag(8));
342   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
343 
344   srv_stream.Finish(Status::OK, tag(9));
345   cli_stream->Finish(&recv_status_, tag(10));
346   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
347 
348   EXPECT_TRUE(recv_status_.ok());
349 }
350 
351 // Testing that this pattern compiles
TEST_F(RawEnd2EndTest,CompileTest)352 TEST_F(RawEnd2EndTest, CompileTest) {
353   typedef grpc::testing::EchoTestService::WithRawMethod_Echo<
354       grpc::testing::EchoTestService::AsyncService>
355       SType;
356   ResetStub();
357   auto service = BuildAndStartServer<SType>();
358 }
359 
360 }  // namespace
361 }  // namespace testing
362 }  // namespace grpc
363 
main(int argc,char ** argv)364 int main(int argc, char** argv) {
365   // Change the backup poll interval from 5s to 100ms to speed up the
366   // ReconnectChannel test
367   grpc::testing::TestEnvironment env(&argc, argv);
368   ::testing::InitGoogleTest(&argc, argv);
369   int ret = RUN_ALL_TESTS();
370   return ret;
371 }
372