1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
24
25 #include <arpa/inet.h>
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <limits.h>
29 #include <netinet/in.h>
30
31 #include "src/core/lib/iomgr/socket_utils.h"
32 #include "src/core/lib/iomgr/socket_utils_posix.h"
33 #ifdef GRPC_LINUX_TCP_H
34 #include <linux/tcp.h>
35 #else
36 #include <netinet/tcp.h>
37 #endif
38 #include <stdio.h>
39 #include <string.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42 #include <unistd.h>
43
44 #include <string>
45
46 #include <grpc/event_engine/endpoint_config.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/sync.h>
50
51 #include "src/core/lib/address_utils/sockaddr_utils.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gprpp/crash.h"
54 #include "src/core/lib/gprpp/strerror.h"
55 #include "src/core/lib/iomgr/sockaddr.h"
56
57 // set a socket to use zerocopy
grpc_set_socket_zerocopy(int fd)58 grpc_error_handle grpc_set_socket_zerocopy(int fd) {
59 #ifdef GRPC_LINUX_ERRQUEUE
60 const int enable = 1;
61 auto err = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
62 if (err != 0) {
63 return GRPC_OS_ERROR(errno, "setsockopt(SO_ZEROCOPY)");
64 }
65 return absl::OkStatus();
66 #else
67 (void)fd;
68 return GRPC_OS_ERROR(ENOSYS, "setsockopt(SO_ZEROCOPY)");
69 #endif
70 }
71
72 // set a socket to non blocking mode
grpc_set_socket_nonblocking(int fd,int non_blocking)73 grpc_error_handle grpc_set_socket_nonblocking(int fd, int non_blocking) {
74 int oldflags = fcntl(fd, F_GETFL, 0);
75 if (oldflags < 0) {
76 return GRPC_OS_ERROR(errno, "fcntl");
77 }
78
79 if (non_blocking) {
80 oldflags |= O_NONBLOCK;
81 } else {
82 oldflags &= ~O_NONBLOCK;
83 }
84
85 if (fcntl(fd, F_SETFL, oldflags) != 0) {
86 return GRPC_OS_ERROR(errno, "fcntl");
87 }
88
89 return absl::OkStatus();
90 }
91
grpc_set_socket_no_sigpipe_if_possible(int fd)92 grpc_error_handle grpc_set_socket_no_sigpipe_if_possible(int fd) {
93 #ifdef GRPC_HAVE_SO_NOSIGPIPE
94 int val = 1;
95 int newval;
96 socklen_t intlen = sizeof(newval);
97 if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
98 return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)");
99 }
100 if (0 != getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
101 return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)");
102 }
103 if ((newval != 0) != (val != 0)) {
104 return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE");
105 }
106 #else
107 // Avoid unused parameter warning for conditional parameter
108 (void)fd;
109 #endif
110 return absl::OkStatus();
111 }
112
grpc_set_socket_ip_pktinfo_if_possible(int fd)113 grpc_error_handle grpc_set_socket_ip_pktinfo_if_possible(int fd) {
114 // Use conditionally-important parameter to avoid warning
115 (void)fd;
116 #ifdef GRPC_HAVE_IP_PKTINFO
117 int get_local_ip = 1;
118 if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
119 sizeof(get_local_ip))) {
120 return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)");
121 }
122 #endif
123 return absl::OkStatus();
124 }
125
grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd)126 grpc_error_handle grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
127 // Use conditionally-important parameter to avoid warning
128 (void)fd;
129 #ifdef GRPC_HAVE_IPV6_RECVPKTINFO
130 int get_local_ip = 1;
131 if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
132 sizeof(get_local_ip))) {
133 return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)");
134 }
135 #endif
136 return absl::OkStatus();
137 }
138
grpc_set_socket_sndbuf(int fd,int buffer_size_bytes)139 grpc_error_handle grpc_set_socket_sndbuf(int fd, int buffer_size_bytes) {
140 return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
141 sizeof(buffer_size_bytes))
142 ? absl::OkStatus()
143 : GRPC_OS_ERROR(errno, "setsockopt(SO_SNDBUF)");
144 }
145
grpc_set_socket_rcvbuf(int fd,int buffer_size_bytes)146 grpc_error_handle grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes) {
147 return 0 == setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
148 sizeof(buffer_size_bytes))
149 ? absl::OkStatus()
150 : GRPC_OS_ERROR(errno, "setsockopt(SO_RCVBUF)");
151 }
152
153 // set a socket to close on exec
grpc_set_socket_cloexec(int fd,int close_on_exec)154 grpc_error_handle grpc_set_socket_cloexec(int fd, int close_on_exec) {
155 int oldflags = fcntl(fd, F_GETFD, 0);
156 if (oldflags < 0) {
157 return GRPC_OS_ERROR(errno, "fcntl");
158 }
159
160 if (close_on_exec) {
161 oldflags |= FD_CLOEXEC;
162 } else {
163 oldflags &= ~FD_CLOEXEC;
164 }
165
166 if (fcntl(fd, F_SETFD, oldflags) != 0) {
167 return GRPC_OS_ERROR(errno, "fcntl");
168 }
169
170 return absl::OkStatus();
171 }
172
173 // set a socket to reuse old addresses
grpc_set_socket_reuse_addr(int fd,int reuse)174 grpc_error_handle grpc_set_socket_reuse_addr(int fd, int reuse) {
175 int val = (reuse != 0);
176 int newval;
177 socklen_t intlen = sizeof(newval);
178 if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
179 return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)");
180 }
181 if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
182 return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)");
183 }
184 if ((newval != 0) != val) {
185 return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR");
186 }
187
188 return absl::OkStatus();
189 }
190
191 // set a socket to reuse old addresses
grpc_set_socket_reuse_port(int fd,int reuse)192 grpc_error_handle grpc_set_socket_reuse_port(int fd, int reuse) {
193 #ifndef SO_REUSEPORT
194 return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system");
195 #else
196 int val = (reuse != 0);
197 int newval;
198 socklen_t intlen = sizeof(newval);
199 if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
200 return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)");
201 }
202 if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
203 return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)");
204 }
205 if ((newval != 0) != val) {
206 return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT");
207 }
208
209 return absl::OkStatus();
210 #endif
211 }
212
213 static gpr_once g_probe_so_reuesport_once = GPR_ONCE_INIT;
214 static int g_support_so_reuseport = false;
215
probe_so_reuseport_once(void)216 void probe_so_reuseport_once(void) {
217 int s = socket(AF_INET, SOCK_STREAM, 0);
218 if (s < 0) {
219 // This might be an ipv6-only environment in which case 'socket(AF_INET,..)'
220 // call would fail. Try creating IPv6 socket in that case
221 s = socket(AF_INET6, SOCK_STREAM, 0);
222 }
223 if (s >= 0) {
224 g_support_so_reuseport = GRPC_LOG_IF_ERROR(
225 "check for SO_REUSEPORT", grpc_set_socket_reuse_port(s, 1));
226 close(s);
227 }
228 }
229
grpc_is_socket_reuse_port_supported()230 bool grpc_is_socket_reuse_port_supported() {
231 gpr_once_init(&g_probe_so_reuesport_once, probe_so_reuseport_once);
232 return g_support_so_reuseport;
233 }
234
235 // disable nagle
grpc_set_socket_low_latency(int fd,int low_latency)236 grpc_error_handle grpc_set_socket_low_latency(int fd, int low_latency) {
237 int val = (low_latency != 0);
238 int newval;
239 socklen_t intlen = sizeof(newval);
240 if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
241 return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)");
242 }
243 if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
244 return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)");
245 }
246 if ((newval != 0) != val) {
247 return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY");
248 }
249 return absl::OkStatus();
250 }
251
252 // The default values for TCP_USER_TIMEOUT are currently configured to be in
253 // line with the default values of KEEPALIVE_TIMEOUT as proposed in
254 // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md
255 #define DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS 20000 // 20 seconds
256 #define DEFAULT_SERVER_TCP_USER_TIMEOUT_MS 20000 // 20 seconds
257
258 static int g_default_client_tcp_user_timeout_ms =
259 DEFAULT_CLIENT_TCP_USER_TIMEOUT_MS;
260 static int g_default_server_tcp_user_timeout_ms =
261 DEFAULT_SERVER_TCP_USER_TIMEOUT_MS;
262 static bool g_default_client_tcp_user_timeout_enabled = false;
263 static bool g_default_server_tcp_user_timeout_enabled = true;
264
265 #if GPR_LINUX == 1
266 // For Linux, it will be detected to support TCP_USER_TIMEOUT
267 #ifndef TCP_USER_TIMEOUT
268 #define TCP_USER_TIMEOUT 18
269 #endif
270 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
271 #else
272 // For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined.
273 #ifdef TCP_USER_TIMEOUT
274 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
275 #else
276 #define TCP_USER_TIMEOUT 0
277 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1
278 #endif // TCP_USER_TIMEOUT
279 #endif // GPR_LINUX == 1
280
281 // Whether the socket supports TCP_USER_TIMEOUT option.
282 // (0: don't know, 1: support, -1: not support)
283 static std::atomic<int> g_socket_supports_tcp_user_timeout(
284 SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT);
285
config_default_tcp_user_timeout(bool enable,int timeout,bool is_client)286 void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client) {
287 if (is_client) {
288 g_default_client_tcp_user_timeout_enabled = enable;
289 if (timeout > 0) {
290 g_default_client_tcp_user_timeout_ms = timeout;
291 }
292 } else {
293 g_default_server_tcp_user_timeout_enabled = enable;
294 if (timeout > 0) {
295 g_default_server_tcp_user_timeout_ms = timeout;
296 }
297 }
298 }
299
300 // Set TCP_USER_TIMEOUT
grpc_set_socket_tcp_user_timeout(int fd,const grpc_core::PosixTcpOptions & options,bool is_client)301 grpc_error_handle grpc_set_socket_tcp_user_timeout(
302 int fd, const grpc_core::PosixTcpOptions& options, bool is_client) {
303 // Use conditionally-important parameter to avoid warning
304 (void)fd;
305 (void)is_client;
306 extern grpc_core::TraceFlag grpc_tcp_trace;
307 if (g_socket_supports_tcp_user_timeout.load() >= 0) {
308 bool enable;
309 int timeout;
310 if (is_client) {
311 enable = g_default_client_tcp_user_timeout_enabled;
312 timeout = g_default_client_tcp_user_timeout_ms;
313 } else {
314 enable = g_default_server_tcp_user_timeout_enabled;
315 timeout = g_default_server_tcp_user_timeout_ms;
316 }
317 int value = options.keep_alive_time_ms;
318 if (value > 0) {
319 enable = value != INT_MAX;
320 }
321 value = options.keep_alive_timeout_ms;
322 if (value > 0) {
323 timeout = value;
324 }
325 if (enable) {
326 int newval;
327 socklen_t len = sizeof(newval);
328 // If this is the first time to use TCP_USER_TIMEOUT, try to check
329 // if it is available.
330 if (g_socket_supports_tcp_user_timeout.load() == 0) {
331 if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
332 gpr_log(GPR_INFO,
333 "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT won't "
334 "be used thereafter");
335 g_socket_supports_tcp_user_timeout.store(-1);
336 } else {
337 gpr_log(GPR_INFO,
338 "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
339 "used thereafter");
340 g_socket_supports_tcp_user_timeout.store(1);
341 }
342 }
343 if (g_socket_supports_tcp_user_timeout.load() > 0) {
344 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
345 gpr_log(GPR_INFO, "Enabling TCP_USER_TIMEOUT with a timeout of %d ms",
346 timeout);
347 }
348 if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
349 sizeof(timeout))) {
350 gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s",
351 grpc_core::StrError(errno).c_str());
352 return absl::OkStatus();
353 }
354 if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
355 gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s",
356 grpc_core::StrError(errno).c_str());
357 return absl::OkStatus();
358 }
359 if (newval != timeout) {
360 gpr_log(GPR_INFO,
361 "Setting TCP_USER_TIMEOUT to value %d ms. Actual "
362 "TCP_USER_TIMEOUT value is %d ms",
363 timeout, newval);
364 return absl::OkStatus();
365 }
366 }
367 }
368 } else {
369 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
370 gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform");
371 }
372 }
373 return absl::OkStatus();
374 }
375
376 // set a socket using a grpc_socket_mutator
grpc_set_socket_with_mutator(int fd,grpc_fd_usage usage,grpc_socket_mutator * mutator)377 grpc_error_handle grpc_set_socket_with_mutator(int fd, grpc_fd_usage usage,
378 grpc_socket_mutator* mutator) {
379 GPR_ASSERT(mutator);
380 if (!grpc_socket_mutator_mutate_fd(mutator, fd, usage)) {
381 return GRPC_ERROR_CREATE("grpc_socket_mutator failed.");
382 }
383 return absl::OkStatus();
384 }
385
grpc_apply_socket_mutator_in_args(int fd,grpc_fd_usage usage,const grpc_core::PosixTcpOptions & options)386 grpc_error_handle grpc_apply_socket_mutator_in_args(
387 int fd, grpc_fd_usage usage, const grpc_core::PosixTcpOptions& options) {
388 if (options.socket_mutator == nullptr) {
389 return absl::OkStatus();
390 }
391 return grpc_set_socket_with_mutator(fd, usage, options.socket_mutator);
392 }
393
394 static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
395 static int g_ipv6_loopback_available;
396
probe_ipv6_once(void)397 static void probe_ipv6_once(void) {
398 int fd = socket(AF_INET6, SOCK_STREAM, 0);
399 g_ipv6_loopback_available = 0;
400 if (fd < 0) {
401 gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
402 } else {
403 grpc_sockaddr_in6 addr;
404 memset(&addr, 0, sizeof(addr));
405 addr.sin6_family = AF_INET6;
406 addr.sin6_addr.s6_addr[15] = 1; // [::1]:0
407 if (bind(fd, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) {
408 g_ipv6_loopback_available = 1;
409 } else {
410 gpr_log(GPR_INFO,
411 "Disabling AF_INET6 sockets because ::1 is not available.");
412 }
413 close(fd);
414 }
415 }
416
grpc_ipv6_loopback_available(void)417 int grpc_ipv6_loopback_available(void) {
418 gpr_once_init(&g_probe_ipv6_once, probe_ipv6_once);
419 return g_ipv6_loopback_available;
420 }
421
error_for_fd(int fd,const grpc_resolved_address * addr)422 static grpc_error_handle error_for_fd(int fd,
423 const grpc_resolved_address* addr) {
424 if (fd >= 0) return absl::OkStatus();
425 auto addr_str = grpc_sockaddr_to_string(addr, false);
426 grpc_error_handle err = grpc_error_set_str(
427 GRPC_OS_ERROR(errno, "socket"),
428 grpc_core::StatusStrProperty::kTargetAddress,
429 addr_str.ok() ? addr_str.value() : addr_str.status().ToString());
430 return err;
431 }
432
grpc_create_dualstack_socket(const grpc_resolved_address * resolved_addr,int type,int protocol,grpc_dualstack_mode * dsmode,int * newfd)433 grpc_error_handle grpc_create_dualstack_socket(
434 const grpc_resolved_address* resolved_addr, int type, int protocol,
435 grpc_dualstack_mode* dsmode, int* newfd) {
436 return grpc_create_dualstack_socket_using_factory(
437 nullptr, resolved_addr, type, protocol, dsmode, newfd);
438 }
439
create_socket(grpc_socket_factory * factory,int domain,int type,int protocol)440 static int create_socket(grpc_socket_factory* factory, int domain, int type,
441 int protocol) {
442 int res = (factory != nullptr)
443 ? grpc_socket_factory_socket(factory, domain, type, protocol)
444 : socket(domain, type, protocol);
445 if (res < 0 && errno == EMFILE) {
446 int saved_errno = errno;
447 GRPC_LOG_EVERY_N_SEC(
448 10, GPR_ERROR,
449 "socket(%d, %d, %d) returned %d with error: |%s|. This process "
450 "might not have a sufficient file descriptor limit for the number "
451 "of connections grpc wants to open (which is generally a function of "
452 "the number of grpc channels, the lb policy of each channel, and the "
453 "number of backends each channel is load balancing across).",
454 domain, type, protocol, res, grpc_core::StrError(errno).c_str());
455 errno = saved_errno;
456 }
457 return res;
458 }
459
grpc_create_dualstack_socket_using_factory(grpc_socket_factory * factory,const grpc_resolved_address * resolved_addr,int type,int protocol,grpc_dualstack_mode * dsmode,int * newfd)460 grpc_error_handle grpc_create_dualstack_socket_using_factory(
461 grpc_socket_factory* factory, const grpc_resolved_address* resolved_addr,
462 int type, int protocol, grpc_dualstack_mode* dsmode, int* newfd) {
463 const grpc_sockaddr* addr =
464 reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
465 int family = addr->sa_family;
466 if (family == AF_INET6) {
467 if (grpc_ipv6_loopback_available()) {
468 *newfd = create_socket(factory, family, type, protocol);
469 } else {
470 *newfd = -1;
471 errno = EAFNOSUPPORT;
472 }
473 // Check if we've got a valid dualstack socket.
474 if (*newfd >= 0 && grpc_set_socket_dualstack(*newfd)) {
475 *dsmode = GRPC_DSMODE_DUALSTACK;
476 return absl::OkStatus();
477 }
478 // If this isn't an IPv4 address, then return whatever we've got.
479 if (!grpc_sockaddr_is_v4mapped(resolved_addr, nullptr)) {
480 *dsmode = GRPC_DSMODE_IPV6;
481 return error_for_fd(*newfd, resolved_addr);
482 }
483 // Fall back to AF_INET.
484 if (*newfd >= 0) {
485 close(*newfd);
486 }
487 family = AF_INET;
488 }
489 *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
490 *newfd = create_socket(factory, family, type, protocol);
491 return error_for_fd(*newfd, resolved_addr);
492 }
493
494 #endif
495