xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/tcp_client_posix_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/address_utils/parse_address.h"
22 #include "src/core/lib/channel/channel_args.h"
23 #include "src/core/lib/gprpp/time.h"
24 #include "src/core/lib/iomgr/port.h"
25 #include "test/core/util/port.h"
26 #include "test/core/util/test_config.h"
27 
28 // This test won't work except with posix sockets enabled
29 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
30 
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <netinet/in.h>
34 #include <poll.h>
35 #include <string.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38 
39 #include <grpc/grpc.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/time.h>
43 
44 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/pollset_set.h"
48 #include "src/core/lib/iomgr/socket_utils_posix.h"
49 #include "src/core/lib/iomgr/tcp_client.h"
50 #include "src/core/lib/iomgr/timer.h"
51 #include "src/core/lib/resource_quota/api.h"
52 
53 static grpc_pollset_set* g_pollset_set;
54 static gpr_mu* g_mu;
55 static grpc_pollset* g_pollset;
56 static int g_connections_complete = 0;
57 static grpc_endpoint* g_connecting = nullptr;
58 
test_deadline(void)59 static grpc_core::Timestamp test_deadline(void) {
60   return grpc_core::Timestamp::FromTimespecRoundUp(
61       grpc_timeout_seconds_to_deadline(10));
62 }
63 
finish_connection()64 static void finish_connection() {
65   gpr_mu_lock(g_mu);
66   g_connections_complete++;
67   grpc_core::ExecCtx exec_ctx;
68   ASSERT_TRUE(
69       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
70 
71   gpr_mu_unlock(g_mu);
72 }
73 
must_succeed(void *,grpc_error_handle error)74 static void must_succeed(void* /*arg*/, grpc_error_handle error) {
75   ASSERT_NE(g_connecting, nullptr);
76   ASSERT_TRUE(error.ok());
77   grpc_endpoint_shutdown(g_connecting,
78                          GRPC_ERROR_CREATE("must_succeed called"));
79   grpc_endpoint_destroy(g_connecting);
80   g_connecting = nullptr;
81   finish_connection();
82 }
83 
must_fail(void *,grpc_error_handle error)84 static void must_fail(void* /*arg*/, grpc_error_handle error) {
85   ASSERT_EQ(g_connecting, nullptr);
86   ASSERT_FALSE(error.ok());
87   finish_connection();
88 }
89 
test_succeeds(void)90 void test_succeeds(void) {
91   gpr_log(GPR_ERROR, "---- starting test_succeeds() ----");
92   grpc_resolved_address resolved_addr;
93   struct sockaddr_in* addr =
94       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
95   int svr_fd;
96   int r;
97   int connections_complete_before;
98   grpc_closure done;
99   grpc_core::ExecCtx exec_ctx;
100 
101   memset(&resolved_addr, 0, sizeof(resolved_addr));
102   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
103   addr->sin_family = AF_INET;
104 
105   // create a phony server
106   svr_fd = socket(AF_INET, SOCK_STREAM, 0);
107   ASSERT_GE(svr_fd, 0);
108   ASSERT_EQ(bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len),
109             0);
110   ASSERT_EQ(listen(svr_fd, 1), 0);
111 
112   gpr_mu_lock(g_mu);
113   connections_complete_before = g_connections_complete;
114   gpr_mu_unlock(g_mu);
115 
116   // connect to it
117   ASSERT_EQ(getsockname(svr_fd, (struct sockaddr*)addr,
118                         (socklen_t*)&resolved_addr.len),
119             0);
120   GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
121   grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
122                                     .channel_args_preconditioning()
123                                     .PreconditionChannelArgs(nullptr);
124   int64_t connection_handle = grpc_tcp_client_connect(
125       &done, &g_connecting, g_pollset_set,
126       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
127       &resolved_addr, grpc_core::Timestamp::InfFuture());
128   // await the connection
129   do {
130     resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
131     r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
132                reinterpret_cast<socklen_t*>(&resolved_addr.len));
133   } while (r == -1 && errno == EINTR);
134   ASSERT_GE(r, 0);
135   close(r);
136 
137   gpr_mu_lock(g_mu);
138 
139   while (g_connections_complete == connections_complete_before) {
140     grpc_pollset_worker* worker = nullptr;
141     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
142         "pollset_work",
143         grpc_pollset_work(g_pollset, &worker,
144                           grpc_core::Timestamp::FromTimespecRoundUp(
145                               grpc_timeout_seconds_to_deadline(5)))));
146     gpr_mu_unlock(g_mu);
147     grpc_core::ExecCtx::Get()->Flush();
148     gpr_mu_lock(g_mu);
149   }
150 
151   gpr_mu_unlock(g_mu);
152 
153   // A cancellation attempt should fail because connect already succeeded.
154   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
155 
156   gpr_log(GPR_ERROR, "---- finished test_succeeds() ----");
157 }
158 
test_fails(void)159 void test_fails(void) {
160   gpr_log(GPR_ERROR, "---- starting test_fails() ----");
161   grpc_resolved_address resolved_addr;
162   struct sockaddr_in* addr =
163       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
164   int connections_complete_before;
165   grpc_closure done;
166   grpc_core::ExecCtx exec_ctx;
167 
168   memset(&resolved_addr, 0, sizeof(resolved_addr));
169   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
170   addr->sin_family = AF_INET;
171 
172   gpr_mu_lock(g_mu);
173   connections_complete_before = g_connections_complete;
174   gpr_mu_unlock(g_mu);
175 
176   // connect to a broken address
177   GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
178   int64_t connection_handle = grpc_tcp_client_connect(
179       &done, &g_connecting, g_pollset_set,
180       grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
181       &resolved_addr, grpc_core::Timestamp::InfFuture());
182   gpr_mu_lock(g_mu);
183 
184   // wait for the connection callback to finish
185   while (g_connections_complete == connections_complete_before) {
186     grpc_pollset_worker* worker = nullptr;
187     grpc_core::Timestamp polling_deadline = test_deadline();
188     switch (grpc_timer_check(&polling_deadline)) {
189       case GRPC_TIMERS_FIRED:
190         break;
191       case GRPC_TIMERS_NOT_CHECKED:
192         polling_deadline = grpc_core::Timestamp::ProcessEpoch();
193         ABSL_FALLTHROUGH_INTENDED;
194       case GRPC_TIMERS_CHECKED_AND_EMPTY:
195         ASSERT_TRUE(GRPC_LOG_IF_ERROR(
196             "pollset_work",
197             grpc_pollset_work(g_pollset, &worker, polling_deadline)));
198         break;
199     }
200     gpr_mu_unlock(g_mu);
201     grpc_core::ExecCtx::Get()->Flush();
202     gpr_mu_lock(g_mu);
203   }
204 
205   gpr_mu_unlock(g_mu);
206 
207   // A cancellation attempt should fail because connect already failed.
208   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
209 
210   gpr_log(GPR_ERROR, "---- finished test_fails() ----");
211 }
212 
test_connect_cancellation_succeeds(void)213 void test_connect_cancellation_succeeds(void) {
214   gpr_log(GPR_ERROR, "---- starting test_connect_cancellation_succeeds() ----");
215   auto target_ipv6_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
216       "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
217   auto target_ipv4_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
218       "ipv4:127.0.0.1:", std::to_string(grpc_pick_unused_port_or_die())));
219   grpc_resolved_address resolved_addr;
220   int svr_fd;
221   grpc_closure done;
222   grpc_core::ExecCtx exec_ctx;
223   bool tried_ipv4 = false;
224   ASSERT_TRUE(grpc_parse_uri(target_ipv6_addr_uri, &resolved_addr));
225   auto try_bind = [&](int sock) {
226     return (sock >= 0 &&
227             bind(sock, reinterpret_cast<sockaddr*>(resolved_addr.addr),
228                  resolved_addr.len) == 0);
229   };
230   // create a phony server
231   svr_fd = socket(AF_INET6, SOCK_STREAM, 0);
232   // Try ipv6
233   if (!try_bind(svr_fd)) {
234     if (svr_fd >= 0) {
235       close(svr_fd);
236     }
237     // Failed to bind ipv6. Try ipv4
238     ASSERT_TRUE(grpc_parse_uri(target_ipv4_addr_uri, &resolved_addr));
239     svr_fd = socket(AF_INET, SOCK_STREAM, 0);
240     tried_ipv4 = true;
241     if (!try_bind(svr_fd)) {
242       if (svr_fd >= 0) {
243         close(svr_fd);
244       }
245       gpr_log(GPR_ERROR,
246               "Skipping test. Failed to create a phony server bound to ipv6 or "
247               "ipv4 address");
248       return;
249     }
250   }
251 
252   ASSERT_EQ(listen(svr_fd, 1), 0);
253 
254   std::vector<int> client_sockets;
255   bool create_more_client_connections = true;
256   // Create and connect client sockets until the connection attempt times out.
257   // Even if the backlog specified to listen is 1, the kernel continues to
258   // accept a certain number of SYN packets before dropping them. This loop
259   // attempts to identify the number of new connection attempts that will
260   // be allowed by the kernel before any subsequent connection attempts
261   // become pending indefinitely.
262   while (create_more_client_connections) {
263     const int kOne = 1;
264     int client_socket = socket(tried_ipv4 ? AF_INET : AF_INET6, SOCK_STREAM, 0);
265     ASSERT_GE(client_socket, 0);
266     setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &kOne, sizeof(kOne));
267     // Make fd non-blocking.
268     int flags = fcntl(client_socket, F_GETFL, 0);
269     ASSERT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
270 
271     if (connect(client_socket, reinterpret_cast<sockaddr*>(resolved_addr.addr),
272                 resolved_addr.len) == -1) {
273       if (errno == EINPROGRESS) {
274         struct pollfd pfd;
275         pfd.fd = client_socket;
276         pfd.events = POLLOUT;
277         pfd.revents = 0;
278         int ret = poll(&pfd, 1, 1000);
279         if (ret == -1) {
280           FAIL() << "poll() failed during connect; errno=" << errno;
281         } else if (ret == 0) {
282           // current connection attempt timed out. It indicates that the
283           // kernel will cause any subsequent connection attempts to
284           // become pending indefinitely.
285           create_more_client_connections = false;
286         }
287       } else {
288         FAIL() << "Failed to connect to the server. errno=%d" << errno;
289       }
290     }
291     client_sockets.push_back(client_socket);
292   }
293 
294   // connect to it. accept() is not called on the bind socket. So the connection
295   // should appear to be stuck giving ample time to try to cancel it.
296   ASSERT_EQ(getsockname(svr_fd, reinterpret_cast<sockaddr*>(resolved_addr.addr),
297                         (socklen_t*)&resolved_addr.len),
298             0);
299   GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
300   grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
301                                     .channel_args_preconditioning()
302                                     .PreconditionChannelArgs(nullptr);
303   int64_t connection_handle = grpc_tcp_client_connect(
304       &done, &g_connecting, g_pollset_set,
305       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
306       &resolved_addr, grpc_core::Timestamp::InfFuture());
307   ASSERT_GT(connection_handle, 0);
308   ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true);
309   for (auto sock : client_sockets) {
310     close(sock);
311   }
312   close(svr_fd);
313   gpr_log(GPR_ERROR, "---- finished test_connect_cancellation_succeeds() ----");
314 }
315 
test_fails_bad_addr_no_leak(void)316 void test_fails_bad_addr_no_leak(void) {
317   gpr_log(GPR_ERROR, "---- starting test_fails_bad_addr_no_leak() ----");
318   grpc_resolved_address resolved_addr;
319   struct sockaddr_in* addr =
320       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
321   int connections_complete_before;
322   grpc_closure done;
323   grpc_core::ExecCtx exec_ctx;
324   memset(&resolved_addr, 0, sizeof(resolved_addr));
325   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
326   // force `grpc_tcp_client_prepare_fd` to fail. contrived, but effective.
327   addr->sin_family = AF_IPX;
328   gpr_mu_lock(g_mu);
329   connections_complete_before = g_connections_complete;
330   gpr_mu_unlock(g_mu);
331   // connect to an invalid address.
332   GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
333   grpc_tcp_client_connect(
334       &done, &g_connecting, g_pollset_set,
335       grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
336       &resolved_addr, grpc_core::Timestamp::InfFuture());
337   gpr_mu_lock(g_mu);
338   while (g_connections_complete == connections_complete_before) {
339     grpc_pollset_worker* worker = nullptr;
340     grpc_core::Timestamp polling_deadline = test_deadline();
341     switch (grpc_timer_check(&polling_deadline)) {
342       case GRPC_TIMERS_FIRED:
343         break;
344       case GRPC_TIMERS_NOT_CHECKED:
345         polling_deadline = grpc_core::Timestamp::ProcessEpoch();
346         ABSL_FALLTHROUGH_INTENDED;
347       case GRPC_TIMERS_CHECKED_AND_EMPTY:
348         ASSERT_TRUE(GRPC_LOG_IF_ERROR(
349             "pollset_work",
350             grpc_pollset_work(g_pollset, &worker, polling_deadline)));
351         break;
352     }
353     gpr_mu_unlock(g_mu);
354     grpc_core::ExecCtx::Get()->Flush();
355     gpr_mu_lock(g_mu);
356   }
357   gpr_mu_unlock(g_mu);
358   gpr_log(GPR_ERROR, "---- finished test_fails_bad_addr_no_leak() ----");
359 }
360 
destroy_pollset(void * p,grpc_error_handle)361 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
362   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
363 }
364 
TEST(TcpClientPosixTest,MainTest)365 TEST(TcpClientPosixTest, MainTest) {
366   grpc_closure destroyed;
367   grpc_init();
368 
369   {
370     grpc_core::ExecCtx exec_ctx;
371     g_pollset_set = grpc_pollset_set_create();
372     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
373     grpc_pollset_init(g_pollset, &g_mu);
374     grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
375 
376     test_succeeds();
377     test_connect_cancellation_succeeds();
378     test_fails();
379     test_fails_bad_addr_no_leak();
380     grpc_pollset_set_destroy(g_pollset_set);
381     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
382                       grpc_schedule_on_exec_ctx);
383     grpc_pollset_shutdown(g_pollset, &destroyed);
384   }
385 
386   grpc_shutdown();
387   gpr_free(g_pollset);
388 }
389 
390 #endif  // GRPC_POSIX_SOCKET_CLIENT
391 
main(int argc,char ** argv)392 int main(int argc, char** argv) {
393   grpc::testing::TestEnvironment env(&argc, argv);
394   ::testing::InitGoogleTest(&argc, argv);
395   return RUN_ALL_TESTS();
396 }
397