xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/generic_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <thread>
21 
22 #include <gtest/gtest.h>
23 
24 #include "absl/memory/memory.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/time.h>
28 #include <grpcpp/channel.h>
29 #include <grpcpp/client_context.h>
30 #include <grpcpp/create_channel.h>
31 #include <grpcpp/generic/async_generic_service.h>
32 #include <grpcpp/generic/generic_stub.h>
33 #include <grpcpp/impl/proto_utils.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 #include <grpcpp/server_context.h>
37 #include <grpcpp/support/slice.h>
38 
39 #include "src/proto/grpc/testing/echo.grpc.pb.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 #include "test/cpp/util/byte_buffer_proto_helper.h"
43 
44 namespace grpc {
45 namespace testing {
46 namespace {
47 
tag(int i)48 void* tag(int i) { return reinterpret_cast<void*>(i); }
49 
verify_ok(CompletionQueue * cq,int i,bool expect_ok)50 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
51   bool ok;
52   void* got_tag;
53   EXPECT_TRUE(cq->Next(&got_tag, &ok));
54   EXPECT_EQ(expect_ok, ok);
55   EXPECT_EQ(tag(i), got_tag);
56 }
57 
58 class GenericEnd2endTest : public ::testing::Test {
59  protected:
GenericEnd2endTest()60   GenericEnd2endTest() : server_host_("localhost") {}
61 
SetUp()62   void SetUp() override {
63     shut_down_ = false;
64     int port = grpc_pick_unused_port_or_die();
65     server_address_ << server_host_ << ":" << port;
66     // Setup server
67     ServerBuilder builder;
68     builder.AddListeningPort(server_address_.str(),
69                              InsecureServerCredentials());
70     builder.RegisterAsyncGenericService(&generic_service_);
71     // Include a second call to RegisterAsyncGenericService to make sure that
72     // we get an error in the log, since it is not allowed to have 2 async
73     // generic services
74     builder.RegisterAsyncGenericService(&generic_service_);
75     srv_cq_ = builder.AddCompletionQueue();
76     server_ = builder.BuildAndStart();
77   }
78 
ShutDownServerAndCQs()79   void ShutDownServerAndCQs() {
80     if (!shut_down_) {
81       server_->Shutdown();
82       void* ignored_tag;
83       bool ignored_ok;
84       cli_cq_.Shutdown();
85       srv_cq_->Shutdown();
86       while (cli_cq_.Next(&ignored_tag, &ignored_ok)) {
87       }
88       while (srv_cq_->Next(&ignored_tag, &ignored_ok)) {
89       }
90       shut_down_ = true;
91     }
92   }
TearDown()93   void TearDown() override { ShutDownServerAndCQs(); }
94 
ResetStub()95   void ResetStub() {
96     std::shared_ptr<Channel> channel = grpc::CreateChannel(
97         server_address_.str(), InsecureChannelCredentials());
98     stub_ = grpc::testing::EchoTestService::NewStub(channel);
99     generic_stub_ = std::make_unique<GenericStub>(channel);
100   }
101 
server_ok(int i)102   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)103   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)104   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)105   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
106 
SendRpc(int num_rpcs)107   void SendRpc(int num_rpcs) {
108     SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
109   }
110 
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)111   void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
112     const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
113     for (int i = 0; i < num_rpcs; i++) {
114       EchoRequest send_request;
115       EchoRequest recv_request;
116       EchoResponse send_response;
117       EchoResponse recv_response;
118       Status recv_status;
119 
120       ClientContext cli_ctx;
121       GenericServerContext srv_ctx;
122       GenericServerAsyncReaderWriter stream(&srv_ctx);
123 
124       // The string needs to be long enough to test heap-based slice.
125       send_request.set_message("Hello world. Hello world. Hello world.");
126 
127       if (check_deadline) {
128         cli_ctx.set_deadline(deadline);
129       }
130 
131       // Rather than using the original kMethodName, make a short-lived
132       // copy to also confirm that we don't refer to this object beyond
133       // the initial call preparation
134       const std::string* method_name = new std::string(kMethodName);
135 
136       std::unique_ptr<GenericClientAsyncReaderWriter> call =
137           generic_stub_->PrepareCall(&cli_ctx, *method_name, &cli_cq_);
138 
139       delete method_name;  // Make sure that this is not needed after invocation
140 
141       std::thread request_call([this]() { server_ok(4); });
142       call->StartCall(tag(1));
143       client_ok(1);
144       std::unique_ptr<ByteBuffer> send_buffer =
145           SerializeToByteBuffer(&send_request);
146       call->Write(*send_buffer, tag(2));
147       // Send ByteBuffer can be destroyed after calling Write.
148       send_buffer.reset();
149       client_ok(2);
150       call->WritesDone(tag(3));
151       client_ok(3);
152 
153       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
154                                    srv_cq_.get(), tag(4));
155 
156       request_call.join();
157       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
158       EXPECT_EQ(kMethodName, srv_ctx.method());
159 
160       if (check_deadline) {
161         EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
162                                      gpr_time_from_millis(1000, GPR_TIMESPAN)));
163       }
164 
165       ByteBuffer recv_buffer;
166       stream.Read(&recv_buffer, tag(5));
167       server_ok(5);
168       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
169       EXPECT_EQ(send_request.message(), recv_request.message());
170 
171       send_response.set_message(recv_request.message());
172       send_buffer = SerializeToByteBuffer(&send_response);
173       stream.Write(*send_buffer, tag(6));
174       send_buffer.reset();
175       server_ok(6);
176 
177       stream.Finish(Status::OK, tag(7));
178       server_ok(7);
179 
180       recv_buffer.Clear();
181       call->Read(&recv_buffer, tag(8));
182       client_ok(8);
183       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
184 
185       call->Finish(&recv_status, tag(9));
186       client_ok(9);
187 
188       EXPECT_EQ(send_response.message(), recv_response.message());
189       EXPECT_TRUE(recv_status.ok());
190     }
191   }
192 
193   // Return errors to up to one call that comes in on the supplied completion
194   // queue, until the CQ is being shut down (and therefore we can no longer
195   // enqueue further events).
DriveCompletionQueue()196   void DriveCompletionQueue() {
197     enum class Event : uintptr_t {
198       kCallReceived,
199       kResponseSent,
200     };
201     // Request the call, but only if the main thread hasn't beaten us to
202     // shutting down the CQ.
203     grpc::GenericServerContext server_context;
204     grpc::GenericServerAsyncReaderWriter reader_writer(&server_context);
205 
206     {
207       std::lock_guard<std::mutex> lock(shutting_down_mu_);
208       if (!shutting_down_) {
209         generic_service_.RequestCall(
210             &server_context, &reader_writer, srv_cq_.get(), srv_cq_.get(),
211             reinterpret_cast<void*>(Event::kCallReceived));
212       }
213     }
214     // Process events.
215     {
216       Event event;
217       bool ok;
218       while (srv_cq_->Next(reinterpret_cast<void**>(&event), &ok)) {
219         std::lock_guard<std::mutex> lock(shutting_down_mu_);
220         if (shutting_down_) {
221           // The main thread has started shutting down. Simply continue to drain
222           // events.
223           continue;
224         }
225 
226         switch (event) {
227           case Event::kCallReceived:
228             reader_writer.Finish(
229                 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "go away"),
230                 reinterpret_cast<void*>(Event::kResponseSent));
231             break;
232 
233           case Event::kResponseSent:
234             // We are done.
235             break;
236         }
237       }
238     }
239   }
240 
241   CompletionQueue cli_cq_;
242   std::unique_ptr<ServerCompletionQueue> srv_cq_;
243   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
244   std::unique_ptr<grpc::GenericStub> generic_stub_;
245   std::unique_ptr<Server> server_;
246   AsyncGenericService generic_service_;
247   const std::string server_host_;
248   std::ostringstream server_address_;
249   bool shutting_down_;
250   bool shut_down_;
251   std::mutex shutting_down_mu_;
252 };
253 
TEST_F(GenericEnd2endTest,SimpleRpc)254 TEST_F(GenericEnd2endTest, SimpleRpc) {
255   ResetStub();
256   SendRpc(1);
257 }
258 
TEST_F(GenericEnd2endTest,SequentialRpcs)259 TEST_F(GenericEnd2endTest, SequentialRpcs) {
260   ResetStub();
261   SendRpc(10);
262 }
263 
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)264 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
265   ResetStub();
266   const int num_rpcs = 10;
267   const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
268   for (int i = 0; i < num_rpcs; i++) {
269     EchoRequest send_request;
270     EchoRequest recv_request;
271     EchoResponse send_response;
272     EchoResponse recv_response;
273     Status recv_status;
274 
275     ClientContext cli_ctx;
276     GenericServerContext srv_ctx;
277     GenericServerAsyncReaderWriter stream(&srv_ctx);
278 
279     // The string needs to be long enough to test heap-based slice.
280     send_request.set_message("Hello world. Hello world. Hello world.");
281 
282     std::unique_ptr<ByteBuffer> cli_send_buffer =
283         SerializeToByteBuffer(&send_request);
284     std::thread request_call([this]() { server_ok(4); });
285     std::unique_ptr<GenericClientAsyncResponseReader> call =
286         generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, *cli_send_buffer,
287                                         &cli_cq_);
288     call->StartCall();
289     ByteBuffer cli_recv_buffer;
290     call->Finish(&cli_recv_buffer, &recv_status, tag(1));
291     std::thread client_check([this] { client_ok(1); });
292 
293     generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
294                                  srv_cq_.get(), tag(4));
295     request_call.join();
296     EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
297     EXPECT_EQ(kMethodName, srv_ctx.method());
298 
299     ByteBuffer srv_recv_buffer;
300     stream.Read(&srv_recv_buffer, tag(5));
301     server_ok(5);
302     EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
303     EXPECT_EQ(send_request.message(), recv_request.message());
304 
305     send_response.set_message(recv_request.message());
306     std::unique_ptr<ByteBuffer> srv_send_buffer =
307         SerializeToByteBuffer(&send_response);
308     stream.Write(*srv_send_buffer, tag(6));
309     server_ok(6);
310 
311     stream.Finish(Status::OK, tag(7));
312     server_ok(7);
313 
314     client_check.join();
315     EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
316     EXPECT_EQ(send_response.message(), recv_response.message());
317     EXPECT_TRUE(recv_status.ok());
318   }
319 }
320 
321 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)322 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
323   ResetStub();
324 
325   const std::string kMethodName(
326       "/grpc.cpp.test.util.EchoTestService/BidiStream");
327   EchoRequest send_request;
328   EchoRequest recv_request;
329   EchoResponse send_response;
330   EchoResponse recv_response;
331   Status recv_status;
332   ClientContext cli_ctx;
333   GenericServerContext srv_ctx;
334   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
335 
336   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
337   send_request.set_message("Hello");
338   std::thread request_call([this]() { server_ok(2); });
339   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
340       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
341   cli_stream->StartCall(tag(1));
342   client_ok(1);
343 
344   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
345                                srv_cq_.get(), tag(2));
346   request_call.join();
347 
348   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
349   EXPECT_EQ(kMethodName, srv_ctx.method());
350 
351   std::unique_ptr<ByteBuffer> send_buffer =
352       SerializeToByteBuffer(&send_request);
353   cli_stream->Write(*send_buffer, tag(3));
354   send_buffer.reset();
355   client_ok(3);
356 
357   ByteBuffer recv_buffer;
358   srv_stream.Read(&recv_buffer, tag(4));
359   server_ok(4);
360   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
361   EXPECT_EQ(send_request.message(), recv_request.message());
362 
363   send_response.set_message(recv_request.message());
364   send_buffer = SerializeToByteBuffer(&send_response);
365   srv_stream.Write(*send_buffer, tag(5));
366   send_buffer.reset();
367   server_ok(5);
368 
369   cli_stream->Read(&recv_buffer, tag(6));
370   client_ok(6);
371   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
372   EXPECT_EQ(send_response.message(), recv_response.message());
373 
374   cli_stream->WritesDone(tag(7));
375   client_ok(7);
376 
377   srv_stream.Read(&recv_buffer, tag(8));
378   server_fail(8);
379 
380   srv_stream.Finish(Status::OK, tag(9));
381   server_ok(9);
382 
383   cli_stream->Finish(&recv_status, tag(10));
384   client_ok(10);
385 
386   EXPECT_EQ(send_response.message(), recv_response.message());
387   EXPECT_TRUE(recv_status.ok());
388 }
389 
TEST_F(GenericEnd2endTest,Deadline)390 TEST_F(GenericEnd2endTest, Deadline) {
391   ResetStub();
392   SendRpc(1, true,
393           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
394                        gpr_time_from_seconds(10, GPR_TIMESPAN)));
395 }
396 
TEST_F(GenericEnd2endTest,ShortDeadline)397 TEST_F(GenericEnd2endTest, ShortDeadline) {
398   ResetStub();
399 
400   ClientContext cli_ctx;
401   EchoRequest request;
402   EchoResponse response;
403 
404   shutting_down_ = false;
405   std::thread driver([this] { DriveCompletionQueue(); });
406 
407   request.set_message("");
408   cli_ctx.set_deadline(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
409                                     gpr_time_from_micros(500, GPR_TIMESPAN)));
410   Status s = stub_->Echo(&cli_ctx, request, &response);
411   EXPECT_FALSE(s.ok());
412   {
413     std::lock_guard<std::mutex> lock(shutting_down_mu_);
414     shutting_down_ = true;
415   }
416   ShutDownServerAndCQs();
417   driver.join();
418 }
419 
420 }  // namespace
421 }  // namespace testing
422 }  // namespace grpc
423 
main(int argc,char ** argv)424 int main(int argc, char** argv) {
425   grpc::testing::TestEnvironment env(&argc, argv);
426   ::testing::InitGoogleTest(&argc, argv);
427   return RUN_ALL_TESTS();
428 }
429