1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
24 
25 #include <arpa/inet.h>
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <limits.h>
29 #include <netinet/in.h>
30 
31 #include "src/core/lib/iomgr/socket_utils.h"
32 #include "src/core/lib/iomgr/socket_utils_posix.h"
33 #ifdef GRPC_LINUX_TCP_H
34 #include <linux/tcp.h>
35 #else
36 #include <netinet/tcp.h>
37 #endif
38 #include <stdio.h>
39 #include <string.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42 #include <unistd.h>
43 
44 #include <string>
45 
46 #include <grpc/event_engine/endpoint_config.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/sync.h>
50 
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/crash.h"
54 #include "src/core/lib/gprpp/strerror.h"
55 #include "src/core/lib/iomgr/sockaddr.h"
56 
57 // set a socket to use zerocopy
grpc_set_socket_zerocopy(int fd)58 grpc_error_handle grpc_set_socket_zerocopy(int fd) {
59 #ifdef GRPC_LINUX_ERRQUEUE
60   const int enable = 1;
61   auto err = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
62   if (err != 0) {
63     return GRPC_OS_ERROR(errno, "setsockopt(SO_ZEROCOPY)");
64   }
65   return absl::OkStatus();
66 #else
67   (void)fd;
68   return GRPC_OS_ERROR(ENOSYS, "setsockopt(SO_ZEROCOPY)");
69 #endif
70 }
71 
72 // set a socket to non blocking mode
grpc_set_socket_nonblocking(int fd,int non_blocking)73 grpc_error_handle grpc_set_socket_nonblocking(int fd, int non_blocking) {
74   int oldflags = fcntl(fd, F_GETFL, 0);
75   if (oldflags < 0) {
76     return GRPC_OS_ERROR(errno, "fcntl");
77   }
78 
79   if (non_blocking) {
80     oldflags |= O_NONBLOCK;
81   } else {
82     oldflags &= ~O_NONBLOCK;
83   }
84 
85   if (fcntl(fd, F_SETFL, oldflags) != 0) {
86     return GRPC_OS_ERROR(errno, "fcntl");
87   }
88 
89   return absl::OkStatus();
90 }
91 
grpc_set_socket_no_sigpipe_if_possible(int fd)92 grpc_error_handle grpc_set_socket_no_sigpipe_if_possible(int fd) {
93 #ifdef GRPC_HAVE_SO_NOSIGPIPE
94   int val = 1;
95   int newval;
96   socklen_t intlen = sizeof(newval);
97   if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
98     return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)");
99   }
100   if (0 != getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
101     return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)");
102   }
103   if ((newval != 0) != (val != 0)) {
104     return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE");
105   }
106 #else
107   // Avoid unused parameter warning for conditional parameter
108   (void)fd;
109 #endif
110   return absl::OkStatus();
111 }
112 
grpc_set_socket_ip_pktinfo_if_possible(int fd)113 grpc_error_handle grpc_set_socket_ip_pktinfo_if_possible(int fd) {
114   // Use conditionally-important parameter to avoid warning
115   (void)fd;
116 #ifdef GRPC_HAVE_IP_PKTINFO
117   int get_local_ip = 1;
118   if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
119                       sizeof(get_local_ip))) {
120     return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)");
121   }
122 #endif
123   return absl::OkStatus();
124 }
125 
grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd)126 grpc_error_handle grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
127   // Use conditionally-important parameter to avoid warning
128   (void)fd;
129 #ifdef GRPC_HAVE_IPV6_RECVPKTINFO
130   int get_local_ip = 1;
131   if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
132                       sizeof(get_local_ip))) {
133     return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)");
134   }
135 #endif
136   return absl::OkStatus();
137 }
138 
grpc_set_socket_sndbuf(int fd,int buffer_size_bytes)139 grpc_error_handle grpc_set_socket_sndbuf(int fd, int buffer_size_bytes) {
140   return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
141                          sizeof(buffer_size_bytes))
142              ? absl::OkStatus()
143              : GRPC_OS_ERROR(errno, "setsockopt(SO_SNDBUF)");
144 }
145 
grpc_set_socket_rcvbuf(int fd,int buffer_size_bytes)146 grpc_error_handle grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes) {
147   return 0 == setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
148                          sizeof(buffer_size_bytes))
149              ? absl::OkStatus()
150              : GRPC_OS_ERROR(errno, "setsockopt(SO_RCVBUF)");
151 }
152 
153 // set a socket to close on exec
grpc_set_socket_cloexec(int fd,int close_on_exec)154 grpc_error_handle grpc_set_socket_cloexec(int fd, int close_on_exec) {
155   int oldflags = fcntl(fd, F_GETFD, 0);
156   if (oldflags < 0) {
157     return GRPC_OS_ERROR(errno, "fcntl");
158   }
159 
160   if (close_on_exec) {
161     oldflags |= FD_CLOEXEC;
162   } else {
163     oldflags &= ~FD_CLOEXEC;
164   }
165 
166   if (fcntl(fd, F_SETFD, oldflags) != 0) {
167     return GRPC_OS_ERROR(errno, "fcntl");
168   }
169 
170   return absl::OkStatus();
171 }
172 
173 // set a socket to reuse old addresses
grpc_set_socket_reuse_addr(int fd,int reuse)174 grpc_error_handle grpc_set_socket_reuse_addr(int fd, int reuse) {
175   int val = (reuse != 0);
176   int newval;
177   socklen_t intlen = sizeof(newval);
178   if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
179     return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)");
180   }
181   if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
182     return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)");
183   }
184   if ((newval != 0) != val) {
185     return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR");
186   }
187 
188   return absl::OkStatus();
189 }
190 
191 // set a socket to reuse old addresses
grpc_set_socket_reuse_port(int fd,int reuse)192 grpc_error_handle grpc_set_socket_reuse_port(int fd, int reuse) {
193 #ifndef SO_REUSEPORT
194   return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system");
195 #else
196   int val = (reuse != 0);
197   int newval;
198   socklen_t intlen = sizeof(newval);
199   if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
200     return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)");
201   }
202   if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
203     return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)");
204   }
205   if ((newval != 0) != val) {
206     return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT");
207   }
208 
209   return absl::OkStatus();
210 #endif
211 }
212 
213 static gpr_once g_probe_so_reuesport_once = GPR_ONCE_INIT;
214 static int g_support_so_reuseport = false;
215 
probe_so_reuseport_once(void)216 void probe_so_reuseport_once(void) {
217   int s = socket(AF_INET, SOCK_STREAM, 0);
218   if (s < 0) {
219     // This might be an ipv6-only environment in which case 'socket(AF_INET,..)'
220     // call would fail. Try creating IPv6 socket in that case
221     s = socket(AF_INET6, SOCK_STREAM, 0);
222   }
223   if (s >= 0) {
224     g_support_so_reuseport = GRPC_LOG_IF_ERROR(
225         "check for SO_REUSEPORT", grpc_set_socket_reuse_port(s, 1));
226     close(s);
227   }
228 }
229 
grpc_is_socket_reuse_port_supported()230 bool grpc_is_socket_reuse_port_supported() {
231   gpr_once_init(&g_probe_so_reuesport_once, probe_so_reuseport_once);
232   return g_support_so_reuseport;
233 }
234 
235 // disable nagle
grpc_set_socket_low_latency(int fd,int low_latency)236 grpc_error_handle grpc_set_socket_low_latency(int fd, int low_latency) {
237   int val = (low_latency != 0);
238   int newval;
239   socklen_t intlen = sizeof(newval);
240   if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
241     return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)");
242   }
243   if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
244     return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)");
245   }
246   if ((newval != 0) != val) {
247     return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY");
248   }
249   return absl::OkStatus();
250 }
251 
252 // The default values for TCP_USER_TIMEOUT are currently configured to be in
253 // line with the default values of KEEPALIVE_TIMEOUT as proposed in
254 // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md
255 #define DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS 20000  // 20 seconds
256 #define DEFAULT_SERVER_TCP_USER_TIMEOUT_MS 20000  // 20 seconds
257 
258 static int g_default_client_tcp_user_timeout_ms =
259     DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS;
260 static int g_default_server_tcp_user_timeout_ms =
261     DEFAULT_SERVER_TCP_USER_TIMEOUT_MS;
262 static bool g_default_client_tcp_user_timeout_enabled = false;
263 static bool g_default_server_tcp_user_timeout_enabled = true;
264 
265 #if GPR_LINUX == 1
266 // For Linux, it will be detected to support TCP_USER_TIMEOUT
267 #ifndef TCP_USER_TIMEOUT
268 #define TCP_USER_TIMEOUT 18
269 #endif
270 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
271 #else
272 // For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined.
273 #ifdef TCP_USER_TIMEOUT
274 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
275 #else
276 #define TCP_USER_TIMEOUT 0
277 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1
278 #endif  // TCP_USER_TIMEOUT
279 #endif  // GPR_LINUX == 1
280 
281 // Whether the socket supports TCP_USER_TIMEOUT option.
282 // (0: don't know, 1: support, -1: not support)
283 static std::atomic<int> g_socket_supports_tcp_user_timeout(
284     SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT);
285 
config_default_tcp_user_timeout(bool enable,int timeout,bool is_client)286 void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) {
287   if (is_client) {
288     g_default_client_tcp_user_timeout_enabled = enable;
289     if (timeout > 0) {
290       g_default_client_tcp_user_timeout_ms = timeout;
291     }
292   } else {
293     g_default_server_tcp_user_timeout_enabled = enable;
294     if (timeout > 0) {
295       g_default_server_tcp_user_timeout_ms = timeout;
296     }
297   }
298 }
299 
300 // Set TCP_USER_TIMEOUT
grpc_set_socket_tcp_user_timeout(int fd,const grpc_core::PosixTcpOptions & options,bool is_client)301 grpc_error_handle grpc_set_socket_tcp_user_timeout(
302     int fd, const grpc_core::PosixTcpOptions& options, bool is_client) {
303   // Use conditionally-important parameter to avoid warning
304   (void)fd;
305   (void)is_client;
306   extern grpc_core::TraceFlag grpc_tcp_trace;
307   if (g_socket_supports_tcp_user_timeout.load() >= 0) {
308     bool enable;
309     int timeout;
310     if (is_client) {
311       enable = g_default_client_tcp_user_timeout_enabled;
312       timeout = g_default_client_tcp_user_timeout_ms;
313     } else {
314       enable = g_default_server_tcp_user_timeout_enabled;
315       timeout = g_default_server_tcp_user_timeout_ms;
316     }
317     int value = options.keep_alive_time_ms;
318     if (value > 0) {
319       enable = value != INT_MAX;
320     }
321     value = options.keep_alive_timeout_ms;
322     if (value > 0) {
323       timeout = value;
324     }
325     if (enable) {
326       int newval;
327       socklen_t len = sizeof(newval);
328       // If this is the first time to use TCP_USER_TIMEOUT, try to check
329       // if it is available.
330       if (g_socket_supports_tcp_user_timeout.load() == 0) {
331         if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
332           gpr_log(GPR_INFO,
333                   "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT won't "
334                   "be used thereafter");
335           g_socket_supports_tcp_user_timeout.store(-1);
336         } else {
337           gpr_log(GPR_INFO,
338                   "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
339                   "used thereafter");
340           g_socket_supports_tcp_user_timeout.store(1);
341         }
342       }
343       if (g_socket_supports_tcp_user_timeout.load() > 0) {
344         if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
345           gpr_log(GPR_INFO, "Enabling TCP_USER_TIMEOUT with a timeout of %d ms",
346                   timeout);
347         }
348         if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
349                             sizeof(timeout))) {
350           gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s",
351                   grpc_core::StrError(errno).c_str());
352           return absl::OkStatus();
353         }
354         if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
355           gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s",
356                   grpc_core::StrError(errno).c_str());
357           return absl::OkStatus();
358         }
359         if (newval != timeout) {
360           gpr_log(GPR_INFO,
361                   "Setting TCP_USER_TIMEOUT to value %d ms. Actual "
362                   "TCP_USER_TIMEOUT value is %d ms",
363                   timeout, newval);
364           return absl::OkStatus();
365         }
366       }
367     }
368   } else {
369     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
370       gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform");
371     }
372   }
373   return absl::OkStatus();
374 }
375 
376 // set a socket using a grpc_socket_mutator
grpc_set_socket_with_mutator(int fd,grpc_fd_usage usage,grpc_socket_mutator * mutator)377 grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage,
378                                                grpc_socket_mutator* mutator) {
379   GPR_ASSERT(mutator);
380   if (!grpc_socket_mutator_mutate_fd(mutator, fd, usage)) {
381     return GRPC_ERROR_CREATE("grpc_socket_mutator failed.");
382   }
383   return absl::OkStatus();
384 }
385 
grpc_apply_socket_mutator_in_args(int fd,grpc_fd_usage usage,const grpc_core::PosixTcpOptions & options)386 grpc_error_handle grpc_apply_socket_mutator_in_args(
387     int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options) {
388   if (options.socket_mutator == nullptr) {
389     return absl::OkStatus();
390   }
391   return grpc_set_socket_with_mutator(fd, usage, options.socket_mutator);
392 }
393 
394 static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
395 static int g_ipv6_loopback_available;
396 
probe_ipv6_once(void)397 static void probe_ipv6_once(void) {
398   int fd = socket(AF_INET6, SOCK_STREAM, 0);
399   g_ipv6_loopback_available = 0;
400   if (fd < 0) {
401     gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
402   } else {
403     grpc_sockaddr_in6 addr;
404     memset(&addr, 0, sizeof(addr));
405     addr.sin6_family = AF_INET6;
406     addr.sin6_addr.s6_addr[15] = 1;  // [::1]:0
407     if (bind(fd, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) {
408       g_ipv6_loopback_available = 1;
409     } else {
410       gpr_log(GPR_INFO,
411               "Disabling AF_INET6 sockets because ::1 is not available.");
412     }
413     close(fd);
414   }
415 }
416 
grpc_ipv6_loopback_available(void)417 int grpc_ipv6_loopback_available(void) {
418   gpr_once_init(&g_probe_ipv6_once, probe_ipv6_once);
419   return g_ipv6_loopback_available;
420 }
421 
error_for_fd(int fd,const grpc_resolved_address * addr)422 static grpc_error_handle error_for_fd(int fd,
423                                       const grpc_resolved_address* addr) {
424   if (fd >= 0) return absl::OkStatus();
425   auto addr_str = grpc_sockaddr_to_string(addr, false);
426   grpc_error_handle err = grpc_error_set_str(
427       GRPC_OS_ERROR(errno, "socket"),
428       grpc_core::StatusStrProperty::kTargetAddress,
429       addr_str.ok() ? addr_str.value() : addr_str.status().ToString());
430   return err;
431 }
432 
grpc_create_dualstack_socket(const grpc_resolved_address * resolved_addr,int type,int protocol,grpc_dualstack_mode * dsmode,int * newfd)433 grpc_error_handle grpc_create_dualstack_socket(
434     const grpc_resolved_address* resolved_addr, int type, int protocol,
435     grpc_dualstack_mode* dsmode, int* newfd) {
436   return grpc_create_dualstack_socket_using_factory(
437       nullptr, resolved_addr, type, protocol, dsmode, newfd);
438 }
439 
create_socket(grpc_socket_factory * factory,int domain,int type,int protocol)440 static int create_socket(grpc_socket_factory* factory, int domain, int type,
441                          int protocol) {
442   int res = (factory != nullptr)
443                 ? grpc_socket_factory_socket(factory, domain, type, protocol)
444                 : socket(domain, type, protocol);
445   if (res < 0 && errno == EMFILE) {
446     int saved_errno = errno;
447     GRPC_LOG_EVERY_N_SEC(
448         10, GPR_ERROR,
449         "socket(%d, %d, %d) returned %d with error: |%s|. This process "
450         "might not have a sufficient file descriptor limit for the number "
451         "of connections grpc wants to open (which is generally a function of "
452         "the number of grpc channels, the lb policy of each channel, and the "
453         "number of backends each channel is load balancing across).",
454         domain, type, protocol, res, grpc_core::StrError(errno).c_str());
455     errno = saved_errno;
456   }
457   return res;
458 }
459 
grpc_create_dualstack_socket_using_factory(grpc_socket_factory * factory,const grpc_resolved_address * resolved_addr,int type,int protocol,grpc_dualstack_mode * dsmode,int * newfd)460 grpc_error_handle grpc_create_dualstack_socket_using_factory(
461     grpc_socket_factory* factory, const grpc_resolved_address* resolved_addr,
462     int type, int protocol, grpc_dualstack_mode* dsmode, int* newfd) {
463   const grpc_sockaddr* addr =
464       reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
465   int family = addr->sa_family;
466   if (family == AF_INET6) {
467     if (grpc_ipv6_loopback_available()) {
468       *newfd = create_socket(factory, family, type, protocol);
469     } else {
470       *newfd = -1;
471       errno = EAFNOSUPPORT;
472     }
473     // Check if we've got a valid dualstack socket.
474     if (*newfd >= 0 && grpc_set_socket_dualstack(*newfd)) {
475       *dsmode = GRPC_DSMODE_DUALSTACK;
476       return absl::OkStatus();
477     }
478     // If this isn't an IPv4 address, then return whatever we've got.
479     if (!grpc_sockaddr_is_v4mapped(resolved_addr, nullptr)) {
480       *dsmode = GRPC_DSMODE_IPV6;
481       return error_for_fd(*newfd, resolved_addr);
482     }
483     // Fall back to AF_INET.
484     if (*newfd >= 0) {
485       close(*newfd);
486     }
487     family = AF_INET;
488   }
489   *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
490   *newfd = create_socket(factory, family, type, protocol);
491   return error_for_fd(*newfd, resolved_addr);
492 }
493 
494 #endif
495