xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/tcp_server_posix_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/event_engine/shim.h"
22 #include "src/core/lib/gprpp/time.h"
23 #include "src/core/lib/iomgr/port.h"
24 #include "test/core/util/test_config.h"
25 
26 // This test won't work except with posix sockets enabled
27 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
28 
29 #include <errno.h>
30 #include <ifaddrs.h>
31 #include <netinet/in.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 
38 #ifdef GRPC_HAVE_UNIX_SOCKET
39 #include <sys/un.h>
40 #endif
41 
42 #include <string>
43 
44 #include <grpc/grpc.h>
45 #include <grpc/support/alloc.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/sync.h>
48 #include <grpc/support/time.h>
49 
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
52 #include "src/core/lib/gprpp/crash.h"
53 #include "src/core/lib/gprpp/memory.h"
54 #include "src/core/lib/gprpp/strerror.h"
55 #include "src/core/lib/iomgr/error.h"
56 #include "src/core/lib/iomgr/iomgr.h"
57 #include "src/core/lib/iomgr/resolve_address.h"
58 #include "src/core/lib/iomgr/tcp_server.h"
59 #include "src/core/lib/resource_quota/api.h"
60 #include "test/core/util/port.h"
61 
62 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
63 
64 static gpr_mu* g_mu;
65 static grpc_pollset* g_pollset;
66 static int g_nconnects = 0;
67 
68 typedef struct {
69   // Owns a ref to server.
70   grpc_tcp_server* server;
71   unsigned port_index;
72   unsigned fd_index;
73   int server_fd;
74 } on_connect_result;
75 
76 typedef struct {
77   grpc_tcp_server* server;
78 
79   // arg is this server_weak_ref.
80   grpc_closure server_shutdown;
81 } server_weak_ref;
82 
83 #define MAX_URI 1024
84 typedef struct {
85   grpc_resolved_address addr;
86   char str[MAX_URI];
87 } test_addr;
88 
89 #define MAX_ADDRS 100
90 typedef struct {
91   size_t naddrs;
92   test_addr addrs[MAX_ADDRS];
93 } test_addrs;
94 
95 static on_connect_result g_result = {nullptr, 0, 0, -1};
96 
97 static char family_name_buf[1024];
sock_family_name(int family)98 static const char* sock_family_name(int family) {
99   if (family == AF_INET) {
100     return "AF_INET";
101   } else if (family == AF_INET6) {
102     return "AF_INET6";
103   } else if (family == AF_UNSPEC) {
104     return "AF_UNSPEC";
105   } else {
106     sprintf(family_name_buf, "%d", family);
107     return family_name_buf;
108   }
109 }
110 
on_connect_result_init(on_connect_result * result)111 static void on_connect_result_init(on_connect_result* result) {
112   result->server = nullptr;
113   result->port_index = 0;
114   result->fd_index = 0;
115   result->server_fd = -1;
116 }
117 
on_connect_result_set(on_connect_result * result,const grpc_tcp_server_acceptor * acceptor)118 static void on_connect_result_set(on_connect_result* result,
119                                   const grpc_tcp_server_acceptor* acceptor) {
120   result->server = grpc_tcp_server_ref(acceptor->from_server);
121   result->port_index = acceptor->port_index;
122   result->fd_index = acceptor->fd_index;
123   result->server_fd = grpc_tcp_server_port_fd(
124       result->server, acceptor->port_index, acceptor->fd_index);
125 }
126 
server_weak_ref_shutdown(void * arg,grpc_error_handle)127 static void server_weak_ref_shutdown(void* arg, grpc_error_handle /*error*/) {
128   server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
129   weak_ref->server = nullptr;
130 }
131 
server_weak_ref_init(server_weak_ref * weak_ref)132 static void server_weak_ref_init(server_weak_ref* weak_ref) {
133   weak_ref->server = nullptr;
134   GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
135                     weak_ref, grpc_schedule_on_exec_ctx);
136 }
137 
138 // Make weak_ref->server_shutdown a shutdown_starting cb on server.
139 // grpc_tcp_server promises that the server object will live until
140 // weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
141 // should be held until server_weak_ref_set() returns to avoid a race where the
142 // server is deleted before the shutdown_starting cb is added.
server_weak_ref_set(server_weak_ref * weak_ref,grpc_tcp_server * server)143 static void server_weak_ref_set(server_weak_ref* weak_ref,
144                                 grpc_tcp_server* server) {
145   grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
146   weak_ref->server = server;
147 }
148 
test_addr_init_str(test_addr * addr)149 static void test_addr_init_str(test_addr* addr) {
150   std::string str = grpc_sockaddr_to_string(&addr->addr, false).value();
151   size_t str_len = std::min(str.size(), sizeof(addr->str) - 1);
152   memcpy(addr->str, str.c_str(), str_len);
153   addr->str[str_len] = '\0';
154 }
155 
on_connect(void *,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)156 static void on_connect(void* /*arg*/, grpc_endpoint* tcp,
157                        grpc_pollset* /*pollset*/,
158                        grpc_tcp_server_acceptor* acceptor) {
159   grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE("Connected"));
160   grpc_endpoint_destroy(tcp);
161 
162   on_connect_result temp_result;
163   on_connect_result_set(&temp_result, acceptor);
164   gpr_free(acceptor);
165 
166   gpr_mu_lock(g_mu);
167   g_result = temp_result;
168   g_nconnects++;
169   ASSERT_TRUE(
170       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
171   gpr_mu_unlock(g_mu);
172 }
173 
test_no_op(void)174 static void test_no_op(void) {
175   grpc_core::ExecCtx exec_ctx;
176   grpc_tcp_server* s;
177   auto args = grpc_core::CoreConfiguration::Get()
178                   .channel_args_preconditioning()
179                   .PreconditionChannelArgs(nullptr);
180   ASSERT_EQ(
181       absl::OkStatus(),
182       grpc_tcp_server_create(
183           nullptr,
184           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
185           on_connect, nullptr, &s));
186   grpc_tcp_server_unref(s);
187 }
188 
test_no_op_with_start(void)189 static void test_no_op_with_start(void) {
190   grpc_core::ExecCtx exec_ctx;
191   grpc_tcp_server* s;
192   auto args = grpc_core::CoreConfiguration::Get()
193                   .channel_args_preconditioning()
194                   .PreconditionChannelArgs(nullptr);
195   ASSERT_EQ(
196       absl::OkStatus(),
197       grpc_tcp_server_create(
198           nullptr,
199           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
200           on_connect, nullptr, &s));
201   LOG_TEST("test_no_op_with_start");
202   std::vector<grpc_pollset*> empty_pollset;
203   grpc_tcp_server_start(s, &empty_pollset);
204   grpc_tcp_server_unref(s);
205 }
206 
test_no_op_with_port(void)207 static void test_no_op_with_port(void) {
208   grpc_core::ExecCtx exec_ctx;
209   grpc_resolved_address resolved_addr;
210   struct sockaddr_in* addr =
211       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
212   grpc_tcp_server* s;
213   auto args = grpc_core::CoreConfiguration::Get()
214                   .channel_args_preconditioning()
215                   .PreconditionChannelArgs(nullptr);
216   ASSERT_EQ(
217       absl::OkStatus(),
218       grpc_tcp_server_create(
219           nullptr,
220           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
221           on_connect, nullptr, &s));
222   LOG_TEST("test_no_op_with_port");
223 
224   memset(&resolved_addr, 0, sizeof(resolved_addr));
225   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
226   addr->sin_family = AF_INET;
227   int port = -1;
228   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
229             absl::OkStatus());
230   ASSERT_GT(port, 0);
231 
232   grpc_tcp_server_unref(s);
233 }
234 
test_no_op_with_port_and_start(void)235 static void test_no_op_with_port_and_start(void) {
236   grpc_core::ExecCtx exec_ctx;
237   grpc_resolved_address resolved_addr;
238   struct sockaddr_in* addr =
239       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240   grpc_tcp_server* s;
241   auto args = grpc_core::CoreConfiguration::Get()
242                   .channel_args_preconditioning()
243                   .PreconditionChannelArgs(nullptr);
244   ASSERT_EQ(
245       absl::OkStatus(),
246       grpc_tcp_server_create(
247           nullptr,
248           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
249           on_connect, nullptr, &s));
250   LOG_TEST("test_no_op_with_port_and_start");
251   int port = -1;
252 
253   memset(&resolved_addr, 0, sizeof(resolved_addr));
254   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
255   addr->sin_family = AF_INET;
256   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
257             absl::OkStatus());
258   ASSERT_GT(port, 0);
259 
260   std::vector<grpc_pollset*> empty_pollset;
261   grpc_tcp_server_start(s, &empty_pollset);
262 
263   grpc_tcp_server_unref(s);
264 }
265 
tcp_connect(const test_addr * remote,on_connect_result * result)266 static grpc_error_handle tcp_connect(const test_addr* remote,
267                                      on_connect_result* result) {
268   grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
269       grpc_timeout_seconds_to_deadline(10));
270   int clifd;
271   int nconnects_before;
272   const struct sockaddr* remote_addr =
273       reinterpret_cast<const struct sockaddr*>(remote->addr.addr);
274 
275   gpr_log(GPR_INFO, "Connecting to %s", remote->str);
276   gpr_mu_lock(g_mu);
277   nconnects_before = g_nconnects;
278   on_connect_result_init(&g_result);
279   clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0);
280   if (clifd < 0) {
281     gpr_mu_unlock(g_mu);
282     return GRPC_OS_ERROR(errno, "Failed to create socket");
283   }
284   gpr_log(GPR_DEBUG, "start connect to %s", remote->str);
285   if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) !=
286       0) {
287     gpr_mu_unlock(g_mu);
288     close(clifd);
289     return GRPC_OS_ERROR(errno, "connect");
290   }
291   gpr_log(GPR_DEBUG, "wait");
292   while (g_nconnects == nconnects_before &&
293          deadline > grpc_core::Timestamp::Now()) {
294     grpc_pollset_worker* worker = nullptr;
295     grpc_error_handle err;
296     if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
297         absl::OkStatus()) {
298       gpr_mu_unlock(g_mu);
299       close(clifd);
300       return err;
301     }
302     gpr_mu_unlock(g_mu);
303 
304     gpr_mu_lock(g_mu);
305   }
306   gpr_log(GPR_DEBUG, "wait done");
307   if (g_nconnects != nconnects_before + 1) {
308     gpr_mu_unlock(g_mu);
309     close(clifd);
310     return GRPC_ERROR_CREATE("Didn't connect");
311   }
312   close(clifd);
313   *result = g_result;
314 
315   gpr_mu_unlock(g_mu);
316   gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index,
317           result->fd_index, result->server_fd);
318   grpc_tcp_server_unref(result->server);
319   return absl::OkStatus();
320 }
321 
322 // Tests a tcp server on "::" listeners with multiple ports. If channel_args is
323 // non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs
324 // as destination addrs (port is not set). If dst_addrs is NULL, use listener
325 // addrs as destination addrs. If test_dst_addrs is true, test connectivity with
326 // each destination address, set grpc_resolved_address::len=0 for failures, but
327 // don't fail the overall unitest.
test_connect(size_t num_connects,const grpc_channel_args * channel_args,test_addrs * dst_addrs,bool test_dst_addrs)328 static void test_connect(size_t num_connects,
329                          const grpc_channel_args* channel_args,
330                          test_addrs* dst_addrs, bool test_dst_addrs) {
331   grpc_core::ExecCtx exec_ctx;
332   grpc_resolved_address resolved_addr;
333   grpc_resolved_address resolved_addr1;
334   struct sockaddr_storage* const addr =
335       reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
336   struct sockaddr_storage* const addr1 =
337       reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr);
338   unsigned svr_fd_count;
339   int port;
340   int svr_port;
341   unsigned svr1_fd_count;
342   int svr1_port;
343   grpc_tcp_server* s;
344   const unsigned num_ports = 2;
345   auto new_channel_args = grpc_core::CoreConfiguration::Get()
346                               .channel_args_preconditioning()
347                               .PreconditionChannelArgs(channel_args);
348   ASSERT_EQ(absl::OkStatus(),
349             grpc_tcp_server_create(
350                 nullptr,
351                 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
352                     new_channel_args),
353                 on_connect, nullptr, &s));
354   unsigned port_num;
355   server_weak_ref weak_ref;
356   server_weak_ref_init(&weak_ref);
357   server_weak_ref_set(&weak_ref, s);
358   LOG_TEST("test_connect");
359   gpr_log(GPR_INFO,
360           "clients=%lu, num chan args=%lu, remote IP=%s, test_dst_addrs=%d",
361           static_cast<unsigned long>(num_connects),
362           static_cast<unsigned long>(
363               channel_args != nullptr ? channel_args->num_args : 0),
364           dst_addrs != nullptr ? "<specific>" : "::", test_dst_addrs);
365   memset(&resolved_addr, 0, sizeof(resolved_addr));
366   memset(&resolved_addr1, 0, sizeof(resolved_addr1));
367   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
368   resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
369   addr->ss_family = addr1->ss_family = AF_INET;
370   ASSERT_TRUE(GRPC_LOG_IF_ERROR(
371       "grpc_tcp_server_add_port",
372       grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)));
373   gpr_log(GPR_INFO, "Allocated port %d", svr_port);
374   ASSERT_GT(svr_port, 0);
375   // Cannot use wildcard (port==0), because add_port() will try to reuse the
376   // same port as a previous add_port().
377   svr1_port = grpc_pick_unused_port_or_die();
378   ASSERT_GT(svr1_port, 0);
379   gpr_log(GPR_INFO, "Picked unused port %d", svr1_port);
380   grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
381   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr1, &port),
382             absl::OkStatus());
383   ASSERT_EQ(port, svr1_port);
384 
385   // Bad port_index.
386   ASSERT_EQ(grpc_tcp_server_port_fd_count(s, 2), 0);
387   ASSERT_LT(grpc_tcp_server_port_fd(s, 2, 0), 0);
388 
389   // Bad fd_index.
390   ASSERT_LT(grpc_tcp_server_port_fd(s, 0, 100), 0);
391   ASSERT_LT(grpc_tcp_server_port_fd(s, 1, 100), 0);
392 
393   // Got at least one fd per port.
394   svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
395   ASSERT_GE(svr_fd_count, 1);
396   svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
397   ASSERT_GE(svr1_fd_count, 1);
398 
399   std::vector<grpc_pollset*> test_pollset;
400   test_pollset.push_back(g_pollset);
401   grpc_tcp_server_start(s, &test_pollset);
402 
403   if (dst_addrs != nullptr) {
404     int ports[] = {svr_port, svr1_port};
405     for (port_num = 0; port_num < num_ports; ++port_num) {
406       size_t dst_idx;
407       size_t num_tested = 0;
408       for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) {
409         test_addr dst = dst_addrs->addrs[dst_idx];
410         on_connect_result result;
411         grpc_error_handle err;
412         if (dst.addr.len == 0) {
413           gpr_log(GPR_DEBUG, "Skipping test of non-functional local IP %s",
414                   dst.str);
415           continue;
416         }
417         ASSERT_TRUE(grpc_sockaddr_set_port(&dst.addr, ports[port_num]));
418         test_addr_init_str(&dst);
419         ++num_tested;
420         on_connect_result_init(&result);
421         if ((err = tcp_connect(&dst, &result)) == absl::OkStatus() &&
422             result.server_fd >= 0 && result.server == s) {
423           continue;
424         }
425         gpr_log(GPR_ERROR, "Failed to connect to %s: %s", dst.str,
426                 grpc_core::StatusToString(err).c_str());
427         ASSERT_TRUE(test_dst_addrs);
428         dst_addrs->addrs[dst_idx].addr.len = 0;
429       }
430       ASSERT_GT(num_tested, 0);
431     }
432   } else {
433     for (port_num = 0; port_num < num_ports; ++port_num) {
434       const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num);
435       unsigned fd_num;
436       for (fd_num = 0; fd_num < num_fds; ++fd_num) {
437         int fd = grpc_tcp_server_port_fd(s, port_num, fd_num);
438         size_t connect_num;
439         test_addr dst;
440         ASSERT_GE(fd, 0);
441         dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
442         ASSERT_EQ(getsockname(fd, (struct sockaddr*)dst.addr.addr,
443                               (socklen_t*)&dst.addr.len),
444                   0);
445         ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
446         test_addr_init_str(&dst);
447         gpr_log(GPR_INFO, "(%d, %d) fd %d family %s listening on %s", port_num,
448                 fd_num, fd, sock_family_name(addr->ss_family), dst.str);
449         for (connect_num = 0; connect_num < num_connects; ++connect_num) {
450           on_connect_result result;
451           on_connect_result_init(&result);
452           ASSERT_TRUE(
453               GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
454           ASSERT_EQ(result.server_fd, fd);
455           ASSERT_EQ(result.port_index, port_num);
456           ASSERT_EQ(result.fd_index, fd_num);
457           ASSERT_EQ(result.server, s);
458           ASSERT_EQ(
459               grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
460               result.server_fd);
461         }
462       }
463     }
464   }
465   // Weak ref to server valid until final unref.
466   ASSERT_NE(weak_ref.server, nullptr);
467   ASSERT_GE(grpc_tcp_server_port_fd(s, 0, 0), 0);
468 
469   grpc_tcp_server_unref(s);
470   grpc_core::ExecCtx::Get()->Flush();
471 
472   // Weak ref lost.
473   ASSERT_EQ(weak_ref.server, nullptr);
474 }
475 
pre_allocate_inet_sock(grpc_tcp_server * s,int family,int port,int * fd)476 static int pre_allocate_inet_sock(grpc_tcp_server* s, int family, int port,
477                                   int* fd) {
478   struct sockaddr_in6 address;
479   memset(&address, 0, sizeof(address));
480   address.sin6_family = family;
481   address.sin6_port = htons(port);
482 
483   int pre_fd = socket(address.sin6_family, SOCK_STREAM, 0);
484   if (pre_fd < 0) {
485     gpr_log(GPR_ERROR, "Unable to create inet socket: %m");
486     return -1;
487   }
488 
489   const int enable = 1;
490   setsockopt(pre_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
491 
492   int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
493                sizeof(address));
494   if (b < 0) {
495     gpr_log(GPR_ERROR, "Unable to bind inet socket: %m");
496     return -1;
497   }
498 
499   int l = listen(pre_fd, SOMAXCONN);
500   if (l < 0) {
501     gpr_log(GPR_ERROR, "Unable to listen on inet socket: %m");
502     return -1;
503   }
504 
505   grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
506   *fd = pre_fd;
507 
508   return 0;
509 }
510 
test_pre_allocated_inet_fd()511 static void test_pre_allocated_inet_fd() {
512   grpc_core::ExecCtx exec_ctx;
513   grpc_resolved_address resolved_addr;
514   struct sockaddr_in6* addr =
515       reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
516   grpc_tcp_server* s;
517   if (grpc_event_engine::experimental::UseEventEngineListener()) {
518     // TODO(vigneshbabu): Skip the test when event engine is enabled.
519     // Pre-allocated fd support will be added to event engine later.
520     return;
521   }
522   auto args = grpc_core::CoreConfiguration::Get()
523                   .channel_args_preconditioning()
524                   .PreconditionChannelArgs(nullptr);
525   ASSERT_EQ(
526       absl::OkStatus(),
527       grpc_tcp_server_create(
528           nullptr,
529           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
530           on_connect, nullptr, &s));
531   LOG_TEST("test_pre_allocated_inet_fd");
532 
533   // Pre allocate FD
534   int pre_fd;
535   int port = grpc_pick_unused_port_or_die();
536 
537   int res_pre = pre_allocate_inet_sock(s, AF_INET6, port, &pre_fd);
538   if (res_pre < 0) {
539     grpc_tcp_server_unref(s);
540     close(pre_fd);
541     return;
542   }
543   ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
544 
545   // Add port
546   int pt;
547   memset(&resolved_addr, 0, sizeof(resolved_addr));
548   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
549   addr->sin6_family = AF_INET6;
550   addr->sin6_port = htons(port);
551   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
552   ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
553   ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
554 
555   // Start server
556   std::vector<grpc_pollset*> test_pollset;
557   test_pollset.push_back(g_pollset);
558   grpc_tcp_server_start(s, &test_pollset);
559 
560   // Test connection
561   test_addr dst;
562   dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
563   ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
564                         (socklen_t*)&dst.addr.len),
565             0);
566   ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
567   test_addr_init_str(&dst);
568   on_connect_result result;
569   on_connect_result_init(&result);
570   ASSERT_EQ(tcp_connect(&dst, &result), absl::OkStatus());
571   ASSERT_EQ(result.server_fd, pre_fd);
572   ASSERT_EQ(result.server, s);
573   ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
574             result.server_fd);
575 
576   grpc_tcp_server_unref(s);
577   close(pre_fd);
578 }
579 
580 #ifdef GRPC_HAVE_UNIX_SOCKET
pre_allocate_unix_sock(grpc_tcp_server * s,const char * path,int * fd)581 static int pre_allocate_unix_sock(grpc_tcp_server* s, const char* path,
582                                   int* fd) {
583   struct sockaddr_un address;
584   memset(&address, 0, sizeof(struct sockaddr_un));
585   address.sun_family = AF_UNIX;
586   strcpy(address.sun_path, path);
587 
588   int pre_fd = socket(address.sun_family, SOCK_STREAM, 0);
589   if (pre_fd < 0) {
590     gpr_log(GPR_ERROR, "Unable to create unix socket: %m");
591     return -1;
592   }
593 
594   int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
595                sizeof(address));
596   if (b < 0) {
597     gpr_log(GPR_ERROR, "Unable to bind unix socket: %m");
598     return -1;
599   }
600 
601   int l = listen(pre_fd, SOMAXCONN);
602   if (l < 0) {
603     gpr_log(GPR_ERROR, "Unable to listen on unix socket: %m");
604     return -1;
605   }
606 
607   grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
608   *fd = pre_fd;
609 
610   return 0;
611 }
612 
test_pre_allocated_unix_fd()613 static void test_pre_allocated_unix_fd() {
614   grpc_core::ExecCtx exec_ctx;
615   grpc_resolved_address resolved_addr;
616   struct sockaddr_un* addr =
617       reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
618   grpc_tcp_server* s;
619   if (grpc_event_engine::experimental::UseEventEngineListener()) {
620     // TODO(vigneshbabu): Skip the test when event engine is enabled.
621     // Pre-allocated fd support will be added to event engine later.
622     return;
623   }
624   auto args = grpc_core::CoreConfiguration::Get()
625                   .channel_args_preconditioning()
626                   .PreconditionChannelArgs(nullptr);
627   ASSERT_EQ(
628       absl::OkStatus(),
629       grpc_tcp_server_create(
630           nullptr,
631           grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
632           on_connect, nullptr, &s));
633   LOG_TEST("test_pre_allocated_unix_fd");
634 
635   // Pre allocate FD
636   int pre_fd;
637   char path[100];
638   srand(time(nullptr));
639   memset(path, 0, sizeof(path));
640   sprintf(path, "/tmp/pre_fd_test_%d", rand());
641 
642   int res_pre = pre_allocate_unix_sock(s, path, &pre_fd);
643   if (res_pre < 0) {
644     grpc_tcp_server_unref(s);
645     close(pre_fd);
646     unlink(path);
647     return;
648   }
649 
650   ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
651 
652   // Add port
653   int pt;
654   memset(&resolved_addr, 0, sizeof(resolved_addr));
655   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_un));
656   addr->sun_family = AF_UNIX;
657   strcpy(addr->sun_path, path);
658   ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
659   ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
660   ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
661 
662   // Start server
663   std::vector<grpc_pollset*> test_pollset;
664   test_pollset.push_back(g_pollset);
665   grpc_tcp_server_start(s, &test_pollset);
666 
667   // Test connection
668   test_addr dst;
669   dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
670   ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
671                         (socklen_t*)&dst.addr.len),
672             0);
673   ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
674   test_addr_init_str(&dst);
675   on_connect_result result;
676   on_connect_result_init(&result);
677 
678   grpc_error_handle res_conn = tcp_connect(&dst, &result);
679   // If the path no longer exists, errno is 2. This can happen when
680   // runninig the test multiple times in parallel. Do not fail the test
681   if (absl::IsUnknown(res_conn) && res_conn.raw_code() == 2) {
682     gpr_log(GPR_ERROR,
683             "Unable to test pre_allocated unix socket: path does not exist");
684     grpc_tcp_server_unref(s);
685     close(pre_fd);
686     return;
687   }
688 
689   ASSERT_EQ(res_conn, absl::OkStatus());
690   ASSERT_EQ(result.server_fd, pre_fd);
691   ASSERT_EQ(result.server, s);
692   ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
693             result.server_fd);
694 
695   grpc_tcp_server_unref(s);
696   close(pre_fd);
697   unlink(path);
698 }
699 #endif  // GRPC_HAVE_UNIX_SOCKET
700 
destroy_pollset(void * p,grpc_error_handle)701 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
702   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
703 }
704 
TEST(TcpServerPosixTest,MainTest)705 TEST(TcpServerPosixTest, MainTest) {
706   grpc_closure destroyed;
707   grpc_arg chan_args[1];
708   chan_args[0].type = GRPC_ARG_INTEGER;
709   chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
710   chan_args[0].value.integer = 1;
711   const grpc_channel_args channel_args = {1, chan_args};
712   struct ifaddrs* ifa = nullptr;
713   struct ifaddrs* ifa_it;
714   // Zalloc dst_addrs to avoid oversized frames.
715   test_addrs* dst_addrs = grpc_core::Zalloc<test_addrs>();
716   grpc_init();
717   // wait a few seconds to make sure IPv6 link-local addresses can be bound
718   // if we are running under docker container that has just started.
719   // See https://github.com/moby/moby/issues/38491
720   // See https://github.com/grpc/grpc/issues/15610
721   gpr_sleep_until(grpc_timeout_seconds_to_deadline(4));
722   {
723     grpc_core::ExecCtx exec_ctx;
724     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
725     grpc_pollset_init(g_pollset, &g_mu);
726 
727     test_no_op();
728     test_no_op_with_start();
729     test_no_op_with_port();
730     test_no_op_with_port_and_start();
731     test_pre_allocated_inet_fd();
732 #ifdef GRPC_HAVE_UNIX_SOCKET
733     test_pre_allocated_unix_fd();
734 #endif
735 
736     if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
737       FAIL() << "getifaddrs: " << grpc_core::StrError(errno);
738     }
739     dst_addrs->naddrs = 0;
740     for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
741          ifa_it = ifa_it->ifa_next) {
742       if (ifa_it->ifa_addr == nullptr) {
743         continue;
744       } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
745         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
746             static_cast<socklen_t>(sizeof(struct sockaddr_in));
747       } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
748         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
749             static_cast<socklen_t>(sizeof(struct sockaddr_in6));
750       } else {
751         continue;
752       }
753       memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
754              dst_addrs->addrs[dst_addrs->naddrs].addr.len);
755       ASSERT_TRUE(
756           grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
757       test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
758       ++dst_addrs->naddrs;
759     }
760     freeifaddrs(ifa);
761     ifa = nullptr;
762 
763     // Connect to same addresses as listeners.
764     test_connect(1, nullptr, nullptr, false);
765     test_connect(10, nullptr, nullptr, false);
766 
767     // Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
768     // "::" listener.
769     test_connect(1, nullptr, dst_addrs, true);
770 
771     // Test connect(2) with dst_addrs.
772     test_connect(1, &channel_args, dst_addrs, false);
773     // Test connect(2) with dst_addrs.
774     test_connect(10, &channel_args, dst_addrs, false);
775 
776     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
777                       grpc_schedule_on_exec_ctx);
778     grpc_pollset_shutdown(g_pollset, &destroyed);
779   }
780   grpc_shutdown();
781   gpr_free(dst_addrs);
782   gpr_free(g_pollset);
783 }
784 
785 #endif  // GRPC_POSIX_SOCKET_SERVER
786 
main(int argc,char ** argv)787 int main(int argc, char** argv) {
788   grpc::testing::TestEnvironment env(&argc, argv);
789   ::testing::InitGoogleTest(&argc, argv);
790   return RUN_ALL_TESTS();
791 }
792