1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include <grpc/support/atm.h>
22
23 #include "src/core/lib/iomgr/port.h"
24
25 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
26
27 #include <errno.h>
28 #include <limits.h>
29 #include <stdio.h>
30 #include <string.h>
31 #include <sys/socket.h>
32
33 #include <string>
34
35 #include "absl/strings/str_cat.h"
36
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/sync.h>
40
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/gprpp/crash.h"
43 #include "src/core/lib/iomgr/error.h"
44 #include "src/core/lib/iomgr/sockaddr.h"
45 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
46 #include "src/core/lib/iomgr/unix_sockets_posix.h"
47
48 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
49
50 static gpr_once s_init_max_accept_queue_size = GPR_ONCE_INIT;
51 static int s_max_accept_queue_size;
52
53 // get max listen queue size on linux
init_max_accept_queue_size(void)54 static void init_max_accept_queue_size(void) {
55 int n = SOMAXCONN;
56 char buf[64];
57 FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
58 if (fp == nullptr) {
59 // 2.4 kernel.
60 s_max_accept_queue_size = SOMAXCONN;
61 return;
62 }
63 if (fgets(buf, sizeof buf, fp)) {
64 char* end;
65 long i = strtol(buf, &end, 10);
66 if (i > 0 && i <= INT_MAX && end && *end == '\n') {
67 n = static_cast<int>(i);
68 }
69 }
70 fclose(fp);
71 s_max_accept_queue_size = n;
72
73 if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
74 gpr_log(GPR_INFO,
75 "Suspiciously small accept queue (%d) will probably lead to "
76 "connection drops",
77 s_max_accept_queue_size);
78 }
79 }
80
get_max_accept_queue_size(void)81 static int get_max_accept_queue_size(void) {
82 gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
83 return s_max_accept_queue_size;
84 }
85
listener_retry_timer_cb(void * arg,grpc_error_handle err)86 static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
87 // Do nothing if cancelled.
88 if (!err.ok()) return;
89 grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
90 gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
91 if (!grpc_fd_is_shutdown(listener->emfd)) {
92 grpc_fd_set_readable(listener->emfd);
93 }
94 }
95
grpc_tcp_server_listener_initialize_retry_timer(grpc_tcp_listener * listener)96 void grpc_tcp_server_listener_initialize_retry_timer(
97 grpc_tcp_listener* listener) {
98 gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
99 grpc_timer_init_unset(&listener->retry_timer);
100 GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
101 grpc_schedule_on_exec_ctx);
102 }
103
add_socket_to_server(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_tcp_listener ** listener)104 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
105 const grpc_resolved_address* addr,
106 unsigned port_index,
107 unsigned fd_index,
108 grpc_tcp_listener** listener) {
109 *listener = nullptr;
110 int port = -1;
111
112 grpc_error_handle err =
113 grpc_tcp_server_prepare_socket(s, fd, addr, s->so_reuseport, &port);
114 if (!err.ok()) return err;
115 GPR_ASSERT(port > 0);
116 absl::StatusOr<std::string> addr_str = grpc_sockaddr_to_string(addr, true);
117 if (!addr_str.ok()) {
118 return GRPC_ERROR_CREATE(addr_str.status().ToString());
119 }
120 std::string name = absl::StrCat("tcp-server-listener:", addr_str.value());
121 gpr_mu_lock(&s->mu);
122 s->nports++;
123 grpc_tcp_listener* sp =
124 static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
125 sp->next = nullptr;
126 if (s->head == nullptr) {
127 s->head = sp;
128 } else {
129 s->tail->next = sp;
130 }
131 s->tail = sp;
132 sp->server = s;
133 sp->fd = fd;
134 sp->emfd = grpc_fd_create(fd, name.c_str(), true);
135 grpc_tcp_server_listener_initialize_retry_timer(sp);
136
137 // Check and set fd as prellocated
138 if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
139 grpc_fd_set_pre_allocated(sp->emfd);
140 }
141
142 memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
143 sp->port = port;
144 sp->port_index = port_index;
145 sp->fd_index = fd_index;
146 sp->is_sibling = 0;
147 sp->sibling = nullptr;
148 GPR_ASSERT(sp->emfd);
149 gpr_mu_unlock(&s->mu);
150
151 *listener = sp;
152 return err;
153 }
154
155 // If successful, add a listener to s for addr, set *dsmode for the socket, and
156 // return the *listener.
grpc_tcp_server_add_addr(grpc_tcp_server * s,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_dualstack_mode * dsmode,grpc_tcp_listener ** listener)157 grpc_error_handle grpc_tcp_server_add_addr(grpc_tcp_server* s,
158 const grpc_resolved_address* addr,
159 unsigned port_index,
160 unsigned fd_index,
161 grpc_dualstack_mode* dsmode,
162 grpc_tcp_listener** listener) {
163 int fd;
164 fd = grpc_tcp_server_pre_allocated_fd(s);
165
166 // Check if FD has been pre-allocated
167 if (fd > 0) {
168 int family = grpc_sockaddr_get_family(addr);
169 // Set dsmode value
170 if (family == AF_INET6) {
171 const int off = 0;
172 if (setsockopt(fd, 0, IPV6_V6ONLY, &off, sizeof(off)) == 0) {
173 *dsmode = GRPC_DSMODE_DUALSTACK;
174 } else if (!grpc_sockaddr_is_v4mapped(addr, nullptr)) {
175 *dsmode = GRPC_DSMODE_IPV6;
176 } else {
177 *dsmode = GRPC_DSMODE_IPV4;
178 }
179 } else {
180 *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
181 }
182
183 grpc_resolved_address addr4_copy;
184 if (*dsmode == GRPC_DSMODE_IPV4 &&
185 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
186 addr = &addr4_copy;
187 }
188
189 return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
190 }
191
192 grpc_resolved_address addr4_copy;
193 grpc_error_handle err =
194 grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
195 if (!err.ok()) {
196 return err;
197 }
198 if (*dsmode == GRPC_DSMODE_IPV4 &&
199 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
200 addr = &addr4_copy;
201 }
202 return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
203 }
204
205 // Prepare a recently-created socket for listening.
grpc_tcp_server_prepare_socket(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,bool so_reuseport,int * port)206 grpc_error_handle grpc_tcp_server_prepare_socket(
207 grpc_tcp_server* s, int fd, const grpc_resolved_address* addr,
208 bool so_reuseport, int* port) {
209 grpc_resolved_address sockname_temp;
210 grpc_error_handle err;
211
212 GPR_ASSERT(fd >= 0);
213
214 if (so_reuseport && !grpc_is_unix_socket(addr)) {
215 err = grpc_set_socket_reuse_port(fd, 1);
216 if (!err.ok()) goto error;
217 }
218
219 #ifdef GRPC_LINUX_ERRQUEUE
220 err = grpc_set_socket_zerocopy(fd);
221 if (!err.ok()) {
222 // it's not fatal, so just log it.
223 gpr_log(GPR_DEBUG, "Node does not support SO_ZEROCOPY, continuing.");
224 }
225 #endif
226 err = grpc_set_socket_nonblocking(fd, 1);
227 if (!err.ok()) goto error;
228 err = grpc_set_socket_cloexec(fd, 1);
229 if (!err.ok()) goto error;
230 if (!grpc_is_unix_socket(addr)) {
231 err = grpc_set_socket_low_latency(fd, 1);
232 if (!err.ok()) goto error;
233 err = grpc_set_socket_reuse_addr(fd, 1);
234 if (!err.ok()) goto error;
235 err =
236 grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */);
237 if (!err.ok()) goto error;
238 }
239 err = grpc_set_socket_no_sigpipe_if_possible(fd);
240 if (!err.ok()) goto error;
241
242 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_LISTENER_USAGE,
243 s->options);
244 if (!err.ok()) goto error;
245
246 // Only bind/listen if fd has not been already preallocated
247 if (grpc_tcp_server_pre_allocated_fd(s) != fd) {
248 if (bind(fd,
249 reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
250 addr->len) < 0) {
251 err = GRPC_OS_ERROR(errno, "bind");
252 goto error;
253 }
254
255 if (listen(fd, get_max_accept_queue_size()) < 0) {
256 err = GRPC_OS_ERROR(errno, "listen");
257 goto error;
258 }
259 }
260
261 sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
262
263 if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
264 &sockname_temp.len) < 0) {
265 err = GRPC_OS_ERROR(errno, "getsockname");
266 goto error;
267 }
268
269 *port = grpc_sockaddr_get_port(&sockname_temp);
270 return absl::OkStatus();
271
272 error:
273 GPR_ASSERT(!err.ok());
274 if (fd >= 0) {
275 close(fd);
276 }
277 grpc_error_handle ret = grpc_error_set_int(
278 GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
279 grpc_core::StatusIntProperty::kFd, fd);
280 return ret;
281 }
282
283 #endif // GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
284