1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/event_engine/shim.h"
22 #include "src/core/lib/gprpp/time.h"
23 #include "src/core/lib/iomgr/port.h"
24 #include "test/core/util/test_config.h"
25
26 // This test won't work except with posix sockets enabled
27 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
28
29 #include <errno.h>
30 #include <ifaddrs.h>
31 #include <netinet/in.h>
32 #include <stdio.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37
38 #ifdef GRPC_HAVE_UNIX_SOCKET
39 #include <sys/un.h>
40 #endif
41
42 #include <string>
43
44 #include <grpc/grpc.h>
45 #include <grpc/support/alloc.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/sync.h>
48 #include <grpc/support/time.h>
49
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
52 #include "src/core/lib/gprpp/crash.h"
53 #include "src/core/lib/gprpp/memory.h"
54 #include "src/core/lib/gprpp/strerror.h"
55 #include "src/core/lib/iomgr/error.h"
56 #include "src/core/lib/iomgr/iomgr.h"
57 #include "src/core/lib/iomgr/resolve_address.h"
58 #include "src/core/lib/iomgr/tcp_server.h"
59 #include "src/core/lib/resource_quota/api.h"
60 #include "test/core/util/port.h"
61
62 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
63
64 static gpr_mu* g_mu;
65 static grpc_pollset* g_pollset;
66 static int g_nconnects = 0;
67
68 typedef struct {
69 // Owns a ref to server.
70 grpc_tcp_server* server;
71 unsigned port_index;
72 unsigned fd_index;
73 int server_fd;
74 } on_connect_result;
75
76 typedef struct {
77 grpc_tcp_server* server;
78
79 // arg is this server_weak_ref.
80 grpc_closure server_shutdown;
81 } server_weak_ref;
82
83 #define MAX_URI 1024
84 typedef struct {
85 grpc_resolved_address addr;
86 char str[MAX_URI];
87 } test_addr;
88
89 #define MAX_ADDRS 100
90 typedef struct {
91 size_t naddrs;
92 test_addr addrs[MAX_ADDRS];
93 } test_addrs;
94
95 static on_connect_result g_result = {nullptr, 0, 0, -1};
96
97 static char family_name_buf[1024];
sock_family_name(int family)98 static const char* sock_family_name(int family) {
99 if (family == AF_INET) {
100 return "AF_INET";
101 } else if (family == AF_INET6) {
102 return "AF_INET6";
103 } else if (family == AF_UNSPEC) {
104 return "AF_UNSPEC";
105 } else {
106 sprintf(family_name_buf, "%d", family);
107 return family_name_buf;
108 }
109 }
110
on_connect_result_init(on_connect_result * result)111 static void on_connect_result_init(on_connect_result* result) {
112 result->server = nullptr;
113 result->port_index = 0;
114 result->fd_index = 0;
115 result->server_fd = -1;
116 }
117
on_connect_result_set(on_connect_result * result,const grpc_tcp_server_acceptor * acceptor)118 static void on_connect_result_set(on_connect_result* result,
119 const grpc_tcp_server_acceptor* acceptor) {
120 result->server = grpc_tcp_server_ref(acceptor->from_server);
121 result->port_index = acceptor->port_index;
122 result->fd_index = acceptor->fd_index;
123 result->server_fd = grpc_tcp_server_port_fd(
124 result->server, acceptor->port_index, acceptor->fd_index);
125 }
126
server_weak_ref_shutdown(void * arg,grpc_error_handle)127 static void server_weak_ref_shutdown(void* arg, grpc_error_handle /*error*/) {
128 server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
129 weak_ref->server = nullptr;
130 }
131
server_weak_ref_init(server_weak_ref * weak_ref)132 static void server_weak_ref_init(server_weak_ref* weak_ref) {
133 weak_ref->server = nullptr;
134 GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
135 weak_ref, grpc_schedule_on_exec_ctx);
136 }
137
138 // Make weak_ref->server_shutdown a shutdown_starting cb on server.
139 // grpc_tcp_server promises that the server object will live until
140 // weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
141 // should be held until server_weak_ref_set() returns to avoid a race where the
142 // server is deleted before the shutdown_starting cb is added.
server_weak_ref_set(server_weak_ref * weak_ref,grpc_tcp_server * server)143 static void server_weak_ref_set(server_weak_ref* weak_ref,
144 grpc_tcp_server* server) {
145 grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
146 weak_ref->server = server;
147 }
148
test_addr_init_str(test_addr * addr)149 static void test_addr_init_str(test_addr* addr) {
150 std::string str = grpc_sockaddr_to_string(&addr->addr, false).value();
151 size_t str_len = std::min(str.size(), sizeof(addr->str) - 1);
152 memcpy(addr->str, str.c_str(), str_len);
153 addr->str[str_len] = '\0';
154 }
155
on_connect(void *,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)156 static void on_connect(void* /*arg*/, grpc_endpoint* tcp,
157 grpc_pollset* /*pollset*/,
158 grpc_tcp_server_acceptor* acceptor) {
159 grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE("Connected"));
160 grpc_endpoint_destroy(tcp);
161
162 on_connect_result temp_result;
163 on_connect_result_set(&temp_result, acceptor);
164 gpr_free(acceptor);
165
166 gpr_mu_lock(g_mu);
167 g_result = temp_result;
168 g_nconnects++;
169 ASSERT_TRUE(
170 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
171 gpr_mu_unlock(g_mu);
172 }
173
test_no_op(void)174 static void test_no_op(void) {
175 grpc_core::ExecCtx exec_ctx;
176 grpc_tcp_server* s;
177 auto args = grpc_core::CoreConfiguration::Get()
178 .channel_args_preconditioning()
179 .PreconditionChannelArgs(nullptr);
180 ASSERT_EQ(
181 absl::OkStatus(),
182 grpc_tcp_server_create(
183 nullptr,
184 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
185 on_connect, nullptr, &s));
186 grpc_tcp_server_unref(s);
187 }
188
test_no_op_with_start(void)189 static void test_no_op_with_start(void) {
190 grpc_core::ExecCtx exec_ctx;
191 grpc_tcp_server* s;
192 auto args = grpc_core::CoreConfiguration::Get()
193 .channel_args_preconditioning()
194 .PreconditionChannelArgs(nullptr);
195 ASSERT_EQ(
196 absl::OkStatus(),
197 grpc_tcp_server_create(
198 nullptr,
199 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
200 on_connect, nullptr, &s));
201 LOG_TEST("test_no_op_with_start");
202 std::vector<grpc_pollset*> empty_pollset;
203 grpc_tcp_server_start(s, &empty_pollset);
204 grpc_tcp_server_unref(s);
205 }
206
test_no_op_with_port(void)207 static void test_no_op_with_port(void) {
208 grpc_core::ExecCtx exec_ctx;
209 grpc_resolved_address resolved_addr;
210 struct sockaddr_in* addr =
211 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
212 grpc_tcp_server* s;
213 auto args = grpc_core::CoreConfiguration::Get()
214 .channel_args_preconditioning()
215 .PreconditionChannelArgs(nullptr);
216 ASSERT_EQ(
217 absl::OkStatus(),
218 grpc_tcp_server_create(
219 nullptr,
220 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
221 on_connect, nullptr, &s));
222 LOG_TEST("test_no_op_with_port");
223
224 memset(&resolved_addr, 0, sizeof(resolved_addr));
225 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
226 addr->sin_family = AF_INET;
227 int port = -1;
228 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
229 absl::OkStatus());
230 ASSERT_GT(port, 0);
231
232 grpc_tcp_server_unref(s);
233 }
234
test_no_op_with_port_and_start(void)235 static void test_no_op_with_port_and_start(void) {
236 grpc_core::ExecCtx exec_ctx;
237 grpc_resolved_address resolved_addr;
238 struct sockaddr_in* addr =
239 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
240 grpc_tcp_server* s;
241 auto args = grpc_core::CoreConfiguration::Get()
242 .channel_args_preconditioning()
243 .PreconditionChannelArgs(nullptr);
244 ASSERT_EQ(
245 absl::OkStatus(),
246 grpc_tcp_server_create(
247 nullptr,
248 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
249 on_connect, nullptr, &s));
250 LOG_TEST("test_no_op_with_port_and_start");
251 int port = -1;
252
253 memset(&resolved_addr, 0, sizeof(resolved_addr));
254 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
255 addr->sin_family = AF_INET;
256 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &port),
257 absl::OkStatus());
258 ASSERT_GT(port, 0);
259
260 std::vector<grpc_pollset*> empty_pollset;
261 grpc_tcp_server_start(s, &empty_pollset);
262
263 grpc_tcp_server_unref(s);
264 }
265
tcp_connect(const test_addr * remote,on_connect_result * result)266 static grpc_error_handle tcp_connect(const test_addr* remote,
267 on_connect_result* result) {
268 grpc_core::Timestamp deadline = grpc_core::Timestamp::FromTimespecRoundUp(
269 grpc_timeout_seconds_to_deadline(10));
270 int clifd;
271 int nconnects_before;
272 const struct sockaddr* remote_addr =
273 reinterpret_cast<const struct sockaddr*>(remote->addr.addr);
274
275 gpr_log(GPR_INFO, "Connecting to %s", remote->str);
276 gpr_mu_lock(g_mu);
277 nconnects_before = g_nconnects;
278 on_connect_result_init(&g_result);
279 clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0);
280 if (clifd < 0) {
281 gpr_mu_unlock(g_mu);
282 return GRPC_OS_ERROR(errno, "Failed to create socket");
283 }
284 gpr_log(GPR_DEBUG, "start connect to %s", remote->str);
285 if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) !=
286 0) {
287 gpr_mu_unlock(g_mu);
288 close(clifd);
289 return GRPC_OS_ERROR(errno, "connect");
290 }
291 gpr_log(GPR_DEBUG, "wait");
292 while (g_nconnects == nconnects_before &&
293 deadline > grpc_core::Timestamp::Now()) {
294 grpc_pollset_worker* worker = nullptr;
295 grpc_error_handle err;
296 if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
297 absl::OkStatus()) {
298 gpr_mu_unlock(g_mu);
299 close(clifd);
300 return err;
301 }
302 gpr_mu_unlock(g_mu);
303
304 gpr_mu_lock(g_mu);
305 }
306 gpr_log(GPR_DEBUG, "wait done");
307 if (g_nconnects != nconnects_before + 1) {
308 gpr_mu_unlock(g_mu);
309 close(clifd);
310 return GRPC_ERROR_CREATE("Didn't connect");
311 }
312 close(clifd);
313 *result = g_result;
314
315 gpr_mu_unlock(g_mu);
316 gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index,
317 result->fd_index, result->server_fd);
318 grpc_tcp_server_unref(result->server);
319 return absl::OkStatus();
320 }
321
322 // Tests a tcp server on "::" listeners with multiple ports. If channel_args is
323 // non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs
324 // as destination addrs (port is not set). If dst_addrs is NULL, use listener
325 // addrs as destination addrs. If test_dst_addrs is true, test connectivity with
326 // each destination address, set grpc_resolved_address::len=0 for failures, but
327 // don't fail the overall unitest.
test_connect(size_t num_connects,const grpc_channel_args * channel_args,test_addrs * dst_addrs,bool test_dst_addrs)328 static void test_connect(size_t num_connects,
329 const grpc_channel_args* channel_args,
330 test_addrs* dst_addrs, bool test_dst_addrs) {
331 grpc_core::ExecCtx exec_ctx;
332 grpc_resolved_address resolved_addr;
333 grpc_resolved_address resolved_addr1;
334 struct sockaddr_storage* const addr =
335 reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
336 struct sockaddr_storage* const addr1 =
337 reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr);
338 unsigned svr_fd_count;
339 int port;
340 int svr_port;
341 unsigned svr1_fd_count;
342 int svr1_port;
343 grpc_tcp_server* s;
344 const unsigned num_ports = 2;
345 auto new_channel_args = grpc_core::CoreConfiguration::Get()
346 .channel_args_preconditioning()
347 .PreconditionChannelArgs(channel_args);
348 ASSERT_EQ(absl::OkStatus(),
349 grpc_tcp_server_create(
350 nullptr,
351 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
352 new_channel_args),
353 on_connect, nullptr, &s));
354 unsigned port_num;
355 server_weak_ref weak_ref;
356 server_weak_ref_init(&weak_ref);
357 server_weak_ref_set(&weak_ref, s);
358 LOG_TEST("test_connect");
359 gpr_log(GPR_INFO,
360 "clients=%lu, num chan args=%lu, remote IP=%s, test_dst_addrs=%d",
361 static_cast<unsigned long>(num_connects),
362 static_cast<unsigned long>(
363 channel_args != nullptr ? channel_args->num_args : 0),
364 dst_addrs != nullptr ? "<specific>" : "::", test_dst_addrs);
365 memset(&resolved_addr, 0, sizeof(resolved_addr));
366 memset(&resolved_addr1, 0, sizeof(resolved_addr1));
367 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
368 resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
369 addr->ss_family = addr1->ss_family = AF_INET;
370 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
371 "grpc_tcp_server_add_port",
372 grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)));
373 gpr_log(GPR_INFO, "Allocated port %d", svr_port);
374 ASSERT_GT(svr_port, 0);
375 // Cannot use wildcard (port==0), because add_port() will try to reuse the
376 // same port as a previous add_port().
377 svr1_port = grpc_pick_unused_port_or_die();
378 ASSERT_GT(svr1_port, 0);
379 gpr_log(GPR_INFO, "Picked unused port %d", svr1_port);
380 grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
381 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr1, &port),
382 absl::OkStatus());
383 ASSERT_EQ(port, svr1_port);
384
385 // Bad port_index.
386 ASSERT_EQ(grpc_tcp_server_port_fd_count(s, 2), 0);
387 ASSERT_LT(grpc_tcp_server_port_fd(s, 2, 0), 0);
388
389 // Bad fd_index.
390 ASSERT_LT(grpc_tcp_server_port_fd(s, 0, 100), 0);
391 ASSERT_LT(grpc_tcp_server_port_fd(s, 1, 100), 0);
392
393 // Got at least one fd per port.
394 svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
395 ASSERT_GE(svr_fd_count, 1);
396 svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
397 ASSERT_GE(svr1_fd_count, 1);
398
399 std::vector<grpc_pollset*> test_pollset;
400 test_pollset.push_back(g_pollset);
401 grpc_tcp_server_start(s, &test_pollset);
402
403 if (dst_addrs != nullptr) {
404 int ports[] = {svr_port, svr1_port};
405 for (port_num = 0; port_num < num_ports; ++port_num) {
406 size_t dst_idx;
407 size_t num_tested = 0;
408 for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) {
409 test_addr dst = dst_addrs->addrs[dst_idx];
410 on_connect_result result;
411 grpc_error_handle err;
412 if (dst.addr.len == 0) {
413 gpr_log(GPR_DEBUG, "Skipping test of non-functional local IP %s",
414 dst.str);
415 continue;
416 }
417 ASSERT_TRUE(grpc_sockaddr_set_port(&dst.addr, ports[port_num]));
418 test_addr_init_str(&dst);
419 ++num_tested;
420 on_connect_result_init(&result);
421 if ((err = tcp_connect(&dst, &result)) == absl::OkStatus() &&
422 result.server_fd >= 0 && result.server == s) {
423 continue;
424 }
425 gpr_log(GPR_ERROR, "Failed to connect to %s: %s", dst.str,
426 grpc_core::StatusToString(err).c_str());
427 ASSERT_TRUE(test_dst_addrs);
428 dst_addrs->addrs[dst_idx].addr.len = 0;
429 }
430 ASSERT_GT(num_tested, 0);
431 }
432 } else {
433 for (port_num = 0; port_num < num_ports; ++port_num) {
434 const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num);
435 unsigned fd_num;
436 for (fd_num = 0; fd_num < num_fds; ++fd_num) {
437 int fd = grpc_tcp_server_port_fd(s, port_num, fd_num);
438 size_t connect_num;
439 test_addr dst;
440 ASSERT_GE(fd, 0);
441 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
442 ASSERT_EQ(getsockname(fd, (struct sockaddr*)dst.addr.addr,
443 (socklen_t*)&dst.addr.len),
444 0);
445 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
446 test_addr_init_str(&dst);
447 gpr_log(GPR_INFO, "(%d, %d) fd %d family %s listening on %s", port_num,
448 fd_num, fd, sock_family_name(addr->ss_family), dst.str);
449 for (connect_num = 0; connect_num < num_connects; ++connect_num) {
450 on_connect_result result;
451 on_connect_result_init(&result);
452 ASSERT_TRUE(
453 GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
454 ASSERT_EQ(result.server_fd, fd);
455 ASSERT_EQ(result.port_index, port_num);
456 ASSERT_EQ(result.fd_index, fd_num);
457 ASSERT_EQ(result.server, s);
458 ASSERT_EQ(
459 grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
460 result.server_fd);
461 }
462 }
463 }
464 }
465 // Weak ref to server valid until final unref.
466 ASSERT_NE(weak_ref.server, nullptr);
467 ASSERT_GE(grpc_tcp_server_port_fd(s, 0, 0), 0);
468
469 grpc_tcp_server_unref(s);
470 grpc_core::ExecCtx::Get()->Flush();
471
472 // Weak ref lost.
473 ASSERT_EQ(weak_ref.server, nullptr);
474 }
475
pre_allocate_inet_sock(grpc_tcp_server * s,int family,int port,int * fd)476 static int pre_allocate_inet_sock(grpc_tcp_server* s, int family, int port,
477 int* fd) {
478 struct sockaddr_in6 address;
479 memset(&address, 0, sizeof(address));
480 address.sin6_family = family;
481 address.sin6_port = htons(port);
482
483 int pre_fd = socket(address.sin6_family, SOCK_STREAM, 0);
484 if (pre_fd < 0) {
485 gpr_log(GPR_ERROR, "Unable to create inet socket: %m");
486 return -1;
487 }
488
489 const int enable = 1;
490 setsockopt(pre_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
491
492 int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
493 sizeof(address));
494 if (b < 0) {
495 gpr_log(GPR_ERROR, "Unable to bind inet socket: %m");
496 return -1;
497 }
498
499 int l = listen(pre_fd, SOMAXCONN);
500 if (l < 0) {
501 gpr_log(GPR_ERROR, "Unable to listen on inet socket: %m");
502 return -1;
503 }
504
505 grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
506 *fd = pre_fd;
507
508 return 0;
509 }
510
test_pre_allocated_inet_fd()511 static void test_pre_allocated_inet_fd() {
512 grpc_core::ExecCtx exec_ctx;
513 grpc_resolved_address resolved_addr;
514 struct sockaddr_in6* addr =
515 reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
516 grpc_tcp_server* s;
517 if (grpc_event_engine::experimental::UseEventEngineListener()) {
518 // TODO(vigneshbabu): Skip the test when event engine is enabled.
519 // Pre-allocated fd support will be added to event engine later.
520 return;
521 }
522 auto args = grpc_core::CoreConfiguration::Get()
523 .channel_args_preconditioning()
524 .PreconditionChannelArgs(nullptr);
525 ASSERT_EQ(
526 absl::OkStatus(),
527 grpc_tcp_server_create(
528 nullptr,
529 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
530 on_connect, nullptr, &s));
531 LOG_TEST("test_pre_allocated_inet_fd");
532
533 // Pre allocate FD
534 int pre_fd;
535 int port = grpc_pick_unused_port_or_die();
536
537 int res_pre = pre_allocate_inet_sock(s, AF_INET6, port, &pre_fd);
538 if (res_pre < 0) {
539 grpc_tcp_server_unref(s);
540 close(pre_fd);
541 return;
542 }
543 ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
544
545 // Add port
546 int pt;
547 memset(&resolved_addr, 0, sizeof(resolved_addr));
548 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
549 addr->sin6_family = AF_INET6;
550 addr->sin6_port = htons(port);
551 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
552 ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
553 ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
554
555 // Start server
556 std::vector<grpc_pollset*> test_pollset;
557 test_pollset.push_back(g_pollset);
558 grpc_tcp_server_start(s, &test_pollset);
559
560 // Test connection
561 test_addr dst;
562 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
563 ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
564 (socklen_t*)&dst.addr.len),
565 0);
566 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
567 test_addr_init_str(&dst);
568 on_connect_result result;
569 on_connect_result_init(&result);
570 ASSERT_EQ(tcp_connect(&dst, &result), absl::OkStatus());
571 ASSERT_EQ(result.server_fd, pre_fd);
572 ASSERT_EQ(result.server, s);
573 ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
574 result.server_fd);
575
576 grpc_tcp_server_unref(s);
577 close(pre_fd);
578 }
579
580 #ifdef GRPC_HAVE_UNIX_SOCKET
pre_allocate_unix_sock(grpc_tcp_server * s,const char * path,int * fd)581 static int pre_allocate_unix_sock(grpc_tcp_server* s, const char* path,
582 int* fd) {
583 struct sockaddr_un address;
584 memset(&address, 0, sizeof(struct sockaddr_un));
585 address.sun_family = AF_UNIX;
586 strcpy(address.sun_path, path);
587
588 int pre_fd = socket(address.sun_family, SOCK_STREAM, 0);
589 if (pre_fd < 0) {
590 gpr_log(GPR_ERROR, "Unable to create unix socket: %m");
591 return -1;
592 }
593
594 int b = bind(pre_fd, reinterpret_cast<struct sockaddr*>(&address),
595 sizeof(address));
596 if (b < 0) {
597 gpr_log(GPR_ERROR, "Unable to bind unix socket: %m");
598 return -1;
599 }
600
601 int l = listen(pre_fd, SOMAXCONN);
602 if (l < 0) {
603 gpr_log(GPR_ERROR, "Unable to listen on unix socket: %m");
604 return -1;
605 }
606
607 grpc_tcp_server_set_pre_allocated_fd(s, pre_fd);
608 *fd = pre_fd;
609
610 return 0;
611 }
612
test_pre_allocated_unix_fd()613 static void test_pre_allocated_unix_fd() {
614 grpc_core::ExecCtx exec_ctx;
615 grpc_resolved_address resolved_addr;
616 struct sockaddr_un* addr =
617 reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
618 grpc_tcp_server* s;
619 if (grpc_event_engine::experimental::UseEventEngineListener()) {
620 // TODO(vigneshbabu): Skip the test when event engine is enabled.
621 // Pre-allocated fd support will be added to event engine later.
622 return;
623 }
624 auto args = grpc_core::CoreConfiguration::Get()
625 .channel_args_preconditioning()
626 .PreconditionChannelArgs(nullptr);
627 ASSERT_EQ(
628 absl::OkStatus(),
629 grpc_tcp_server_create(
630 nullptr,
631 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
632 on_connect, nullptr, &s));
633 LOG_TEST("test_pre_allocated_unix_fd");
634
635 // Pre allocate FD
636 int pre_fd;
637 char path[100];
638 srand(time(nullptr));
639 memset(path, 0, sizeof(path));
640 sprintf(path, "/tmp/pre_fd_test_%d", rand());
641
642 int res_pre = pre_allocate_unix_sock(s, path, &pre_fd);
643 if (res_pre < 0) {
644 grpc_tcp_server_unref(s);
645 close(pre_fd);
646 unlink(path);
647 return;
648 }
649
650 ASSERT_EQ(grpc_tcp_server_pre_allocated_fd(s), pre_fd);
651
652 // Add port
653 int pt;
654 memset(&resolved_addr, 0, sizeof(resolved_addr));
655 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_un));
656 addr->sun_family = AF_UNIX;
657 strcpy(addr->sun_path, path);
658 ASSERT_EQ(grpc_tcp_server_add_port(s, &resolved_addr, &pt), absl::OkStatus());
659 ASSERT_GE(grpc_tcp_server_port_fd_count(s, 0), 1);
660 ASSERT_EQ(grpc_tcp_server_port_fd(s, 0, 0), pre_fd);
661
662 // Start server
663 std::vector<grpc_pollset*> test_pollset;
664 test_pollset.push_back(g_pollset);
665 grpc_tcp_server_start(s, &test_pollset);
666
667 // Test connection
668 test_addr dst;
669 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
670 ASSERT_EQ(getsockname(pre_fd, (struct sockaddr*)dst.addr.addr,
671 (socklen_t*)&dst.addr.len),
672 0);
673 ASSERT_LE(dst.addr.len, sizeof(dst.addr.addr));
674 test_addr_init_str(&dst);
675 on_connect_result result;
676 on_connect_result_init(&result);
677
678 grpc_error_handle res_conn = tcp_connect(&dst, &result);
679 // If the path no longer exists, errno is 2. This can happen when
680 // runninig the test multiple times in parallel. Do not fail the test
681 if (absl::IsUnknown(res_conn) && res_conn.raw_code() == 2) {
682 gpr_log(GPR_ERROR,
683 "Unable to test pre_allocated unix socket: path does not exist");
684 grpc_tcp_server_unref(s);
685 close(pre_fd);
686 return;
687 }
688
689 ASSERT_EQ(res_conn, absl::OkStatus());
690 ASSERT_EQ(result.server_fd, pre_fd);
691 ASSERT_EQ(result.server, s);
692 ASSERT_EQ(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index),
693 result.server_fd);
694
695 grpc_tcp_server_unref(s);
696 close(pre_fd);
697 unlink(path);
698 }
699 #endif // GRPC_HAVE_UNIX_SOCKET
700
destroy_pollset(void * p,grpc_error_handle)701 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
702 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
703 }
704
TEST(TcpServerPosixTest,MainTest)705 TEST(TcpServerPosixTest, MainTest) {
706 grpc_closure destroyed;
707 grpc_arg chan_args[1];
708 chan_args[0].type = GRPC_ARG_INTEGER;
709 chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
710 chan_args[0].value.integer = 1;
711 const grpc_channel_args channel_args = {1, chan_args};
712 struct ifaddrs* ifa = nullptr;
713 struct ifaddrs* ifa_it;
714 // Zalloc dst_addrs to avoid oversized frames.
715 test_addrs* dst_addrs = grpc_core::Zalloc<test_addrs>();
716 grpc_init();
717 // wait a few seconds to make sure IPv6 link-local addresses can be bound
718 // if we are running under docker container that has just started.
719 // See https://github.com/moby/moby/issues/38491
720 // See https://github.com/grpc/grpc/issues/15610
721 gpr_sleep_until(grpc_timeout_seconds_to_deadline(4));
722 {
723 grpc_core::ExecCtx exec_ctx;
724 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
725 grpc_pollset_init(g_pollset, &g_mu);
726
727 test_no_op();
728 test_no_op_with_start();
729 test_no_op_with_port();
730 test_no_op_with_port_and_start();
731 test_pre_allocated_inet_fd();
732 #ifdef GRPC_HAVE_UNIX_SOCKET
733 test_pre_allocated_unix_fd();
734 #endif
735
736 if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
737 FAIL() << "getifaddrs: " << grpc_core::StrError(errno);
738 }
739 dst_addrs->naddrs = 0;
740 for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
741 ifa_it = ifa_it->ifa_next) {
742 if (ifa_it->ifa_addr == nullptr) {
743 continue;
744 } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
745 dst_addrs->addrs[dst_addrs->naddrs].addr.len =
746 static_cast<socklen_t>(sizeof(struct sockaddr_in));
747 } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
748 dst_addrs->addrs[dst_addrs->naddrs].addr.len =
749 static_cast<socklen_t>(sizeof(struct sockaddr_in6));
750 } else {
751 continue;
752 }
753 memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
754 dst_addrs->addrs[dst_addrs->naddrs].addr.len);
755 ASSERT_TRUE(
756 grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
757 test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
758 ++dst_addrs->naddrs;
759 }
760 freeifaddrs(ifa);
761 ifa = nullptr;
762
763 // Connect to same addresses as listeners.
764 test_connect(1, nullptr, nullptr, false);
765 test_connect(10, nullptr, nullptr, false);
766
767 // Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
768 // "::" listener.
769 test_connect(1, nullptr, dst_addrs, true);
770
771 // Test connect(2) with dst_addrs.
772 test_connect(1, &channel_args, dst_addrs, false);
773 // Test connect(2) with dst_addrs.
774 test_connect(10, &channel_args, dst_addrs, false);
775
776 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
777 grpc_schedule_on_exec_ctx);
778 grpc_pollset_shutdown(g_pollset, &destroyed);
779 }
780 grpc_shutdown();
781 gpr_free(dst_addrs);
782 gpr_free(g_pollset);
783 }
784
785 #endif // GRPC_POSIX_SOCKET_SERVER
786
main(int argc,char ** argv)787 int main(int argc, char** argv) {
788 grpc::testing::TestEnvironment env(&argc, argv);
789 ::testing::InitGoogleTest(&argc, argv);
790 return RUN_ALL_TESTS();
791 }
792