xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/nonblocking_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 
21 #include "absl/memory/memory.h"
22 
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/server.h>
27 #include <grpcpp/server_builder.h>
28 #include <grpcpp/server_context.h>
29 
30 #include "src/core/lib/iomgr/port.h"
31 #include "src/proto/grpc/testing/echo.grpc.pb.h"
32 #include "test/core/util/port.h"
33 #include "test/core/util/test_config.h"
34 
35 #ifdef GRPC_POSIX_SOCKET
36 #include "src/core/lib/iomgr/ev_posix.h"
37 #endif  // GRPC_POSIX_SOCKET
38 
39 #include <gtest/gtest.h>
40 
41 #ifdef GRPC_POSIX_SOCKET
42 // Thread-local variable to so that only polls from this test assert
43 // non-blocking (not polls from resolver, timer thread, etc), and only when the
44 // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for
45 // picking a port or other reasons).
46 static thread_local bool g_is_nonblocking_poll;
47 
48 namespace {
49 
maybe_assert_non_blocking_poll(struct pollfd * pfds,nfds_t nfds,int timeout)50 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
51                                    int timeout) {
52   // Only assert that this poll should have zero timeout if we're in the
53   // middle of a zero-timeout CQ Next.
54   if (g_is_nonblocking_poll) {
55     GPR_ASSERT(timeout == 0);
56   }
57   return poll(pfds, nfds, timeout);
58 }
59 
60 }  // namespace
61 
62 namespace grpc {
63 namespace testing {
64 namespace {
65 
tag(int i)66 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
detag(void * p)67 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
68 
69 class NonblockingTest : public ::testing::Test {
70  protected:
NonblockingTest()71   NonblockingTest() {}
72 
SetUp()73   void SetUp() override {
74     port_ = grpc_pick_unused_port_or_die();
75     server_address_ << "localhost:" << port_;
76 
77     // Setup server
78     BuildAndStartServer();
79   }
80 
LoopForTag(void ** tag,bool * ok)81   bool LoopForTag(void** tag, bool* ok) {
82     // Temporarily set the thread-local nonblocking poll flag so that the polls
83     // caused by this loop are indeed sent by the library with zero timeout.
84     bool orig_val = g_is_nonblocking_poll;
85     g_is_nonblocking_poll = true;
86     for (;;) {
87       auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
88       if (r == CompletionQueue::SHUTDOWN) {
89         g_is_nonblocking_poll = orig_val;
90         return false;
91       } else if (r == CompletionQueue::GOT_EVENT) {
92         g_is_nonblocking_poll = orig_val;
93         return true;
94       }
95     }
96   }
97 
TearDown()98   void TearDown() override {
99     server_->Shutdown();
100     void* ignored_tag;
101     bool ignored_ok;
102     cq_->Shutdown();
103     while (LoopForTag(&ignored_tag, &ignored_ok)) {
104     }
105     stub_.reset();
106     grpc_recycle_unused_port(port_);
107   }
108 
BuildAndStartServer()109   void BuildAndStartServer() {
110     ServerBuilder builder;
111     builder.AddListeningPort(server_address_.str(),
112                              grpc::InsecureServerCredentials());
113     service_ = std::make_unique<grpc::testing::EchoTestService::AsyncService>();
114     builder.RegisterService(service_.get());
115     cq_ = builder.AddCompletionQueue();
116     server_ = builder.BuildAndStart();
117   }
118 
ResetStub()119   void ResetStub() {
120     std::shared_ptr<Channel> channel = grpc::CreateChannel(
121         server_address_.str(), grpc::InsecureChannelCredentials());
122     stub_ = grpc::testing::EchoTestService::NewStub(channel);
123   }
124 
SendRpc(int num_rpcs)125   void SendRpc(int num_rpcs) {
126     for (int i = 0; i < num_rpcs; i++) {
127       EchoRequest send_request;
128       EchoRequest recv_request;
129       EchoResponse send_response;
130       EchoResponse recv_response;
131       Status recv_status;
132 
133       ClientContext cli_ctx;
134       ServerContext srv_ctx;
135       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
136 
137       send_request.set_message("hello non-blocking world");
138       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
139           stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
140 
141       response_reader->StartCall();
142       response_reader->Finish(&recv_response, &recv_status, tag(4));
143 
144       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
145                             cq_.get(), cq_.get(), tag(2));
146 
147       void* got_tag;
148       bool ok;
149       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
150       EXPECT_TRUE(ok);
151       EXPECT_EQ(detag(got_tag), 2);
152       EXPECT_EQ(send_request.message(), recv_request.message());
153 
154       send_response.set_message(recv_request.message());
155       response_writer.Finish(send_response, Status::OK, tag(3));
156 
157       int tagsum = 0;
158       int tagprod = 1;
159       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
160       EXPECT_TRUE(ok);
161       tagsum += detag(got_tag);
162       tagprod *= detag(got_tag);
163 
164       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
165       EXPECT_TRUE(ok);
166       tagsum += detag(got_tag);
167       tagprod *= detag(got_tag);
168 
169       EXPECT_EQ(tagsum, 7);
170       EXPECT_EQ(tagprod, 12);
171       EXPECT_EQ(send_response.message(), recv_response.message());
172       EXPECT_TRUE(recv_status.ok());
173     }
174   }
175 
176   std::unique_ptr<ServerCompletionQueue> cq_;
177   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
178   std::unique_ptr<Server> server_;
179   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
180   std::ostringstream server_address_;
181   int port_;
182 };
183 
TEST_F(NonblockingTest,SimpleRpc)184 TEST_F(NonblockingTest, SimpleRpc) {
185   ResetStub();
186   SendRpc(10);
187 }
188 
189 }  // namespace
190 }  // namespace testing
191 }  // namespace grpc
192 
193 #endif  // GRPC_POSIX_SOCKET
194 
main(int argc,char ** argv)195 int main(int argc, char** argv) {
196 #ifdef GRPC_POSIX_SOCKET
197   // Override the poll function before anything else can happen
198   grpc_poll_function = maybe_assert_non_blocking_poll;
199 
200   grpc::testing::TestEnvironment env(&argc, argv);
201   ::testing::InitGoogleTest(&argc, argv);
202 
203   // Start the nonblocking poll thread-local variable as false because the
204   // thread that issues RPCs starts by picking a port (which has non-zero
205   // timeout).
206   g_is_nonblocking_poll = false;
207 
208   int ret = RUN_ALL_TESTS();
209 
210   return ret;
211 #else   // GRPC_POSIX_SOCKET
212   (void)argc;
213   (void)argv;
214   return 0;
215 #endif  // GRPC_POSIX_SOCKET
216 }
217