1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include <utility>
22
23 #include <grpc/support/atm.h>
24
25 // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
26 #ifndef _GNU_SOURCE
27 #define _GNU_SOURCE
28 #include <grpc/event_engine/event_engine.h>
29 #endif
30
31 #include "src/core/lib/iomgr/port.h"
32
33 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
34
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <sys/stat.h>
43 #include <sys/types.h>
44 #include <unistd.h>
45
46 #include <string>
47
48 #include "absl/strings/str_cat.h"
49 #include "absl/strings/str_format.h"
50
51 #include <grpc/byte_buffer.h>
52 #include <grpc/event_engine/endpoint_config.h>
53 #include <grpc/support/alloc.h>
54 #include <grpc/support/log.h>
55 #include <grpc/support/sync.h>
56 #include <grpc/support/time.h>
57
58 #include "src/core/lib/address_utils/sockaddr_utils.h"
59 #include "src/core/lib/event_engine/default_event_engine.h"
60 #include "src/core/lib/event_engine/memory_allocator_factory.h"
61 #include "src/core/lib/event_engine/resolved_address_internal.h"
62 #include "src/core/lib/event_engine/shim.h"
63 #include "src/core/lib/event_engine/tcp_socket_utils.h"
64 #include "src/core/lib/gpr/string.h"
65 #include "src/core/lib/gprpp/crash.h"
66 #include "src/core/lib/gprpp/memory.h"
67 #include "src/core/lib/gprpp/strerror.h"
68 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
69 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
70 #include "src/core/lib/iomgr/exec_ctx.h"
71 #include "src/core/lib/iomgr/resolve_address.h"
72 #include "src/core/lib/iomgr/sockaddr.h"
73 #include "src/core/lib/iomgr/socket_utils_posix.h"
74 #include "src/core/lib/iomgr/systemd_utils.h"
75 #include "src/core/lib/iomgr/tcp_posix.h"
76 #include "src/core/lib/iomgr/tcp_server.h"
77 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
78 #include "src/core/lib/iomgr/unix_sockets_posix.h"
79 #include "src/core/lib/resource_quota/api.h"
80 #include "src/core/lib/transport/error_utils.h"
81
82 static std::atomic<int64_t> num_dropped_connections{0};
83 static constexpr grpc_core::Duration kRetryAcceptWaitTime{
84 grpc_core::Duration::Seconds(1)};
85
86 using ::grpc_event_engine::experimental::EndpointConfig;
87 using ::grpc_event_engine::experimental::EventEngine;
88 using ::grpc_event_engine::experimental::MemoryAllocator;
89 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
90 using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
91 using ::grpc_event_engine::experimental::SliceBuffer;
92
CreateEventEngineListener(grpc_tcp_server * s,grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server ** server)93 static grpc_error_handle CreateEventEngineListener(
94 grpc_tcp_server* s, grpc_closure* shutdown_complete,
95 const EndpointConfig& config, grpc_tcp_server** server) {
96 absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener;
97 if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
98 PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
99 [s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
100 bool is_external, MemoryAllocator /*allocator*/,
101 SliceBuffer* pending_data) {
102 grpc_core::ApplicationCallbackExecCtx app_ctx;
103 grpc_core::ExecCtx exec_ctx;
104 grpc_tcp_server_acceptor* acceptor =
105 static_cast<grpc_tcp_server_acceptor*>(
106 gpr_malloc(sizeof(*acceptor)));
107 acceptor->from_server = s;
108 acceptor->port_index = -1;
109 acceptor->fd_index = -1;
110 if (!is_external) {
111 auto it = s->listen_fd_to_index_map.find(listener_fd);
112 if (it != s->listen_fd_to_index_map.end()) {
113 acceptor->port_index = std::get<0>(it->second);
114 acceptor->fd_index = std::get<1>(it->second);
115 }
116 } else {
117 // External connection handling.
118 grpc_resolved_address addr;
119 memset(&addr, 0, sizeof(addr));
120 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
121 // Get the fd of the socket connected to peer.
122 int fd = reinterpret_cast<grpc_event_engine::experimental::
123 PosixEndpointWithFdSupport*>(ep.get())
124 ->GetWrappedFd();
125 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
126 &(addr.len)) < 0) {
127 gpr_log(GPR_ERROR, "Failed getpeername: %s",
128 grpc_core::StrError(errno).c_str());
129 close(fd);
130 return;
131 }
132 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
133 auto addr_uri = grpc_sockaddr_to_uri(&addr);
134 if (!addr_uri.ok()) {
135 gpr_log(GPR_ERROR, "Invalid address: %s",
136 addr_uri.status().ToString().c_str());
137 return;
138 }
139 if (grpc_tcp_trace.enabled()) {
140 gpr_log(GPR_INFO,
141 "SERVER_CONNECT: incoming external connection: %s",
142 addr_uri->c_str());
143 }
144 }
145 grpc_pollset* read_notifier_pollset =
146 (*(s->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
147 &s->next_pollset_to_assign, 1)) %
148 s->pollsets->size()];
149 acceptor->external_connection = is_external;
150 acceptor->listener_fd = listener_fd;
151 grpc_byte_buffer* buf = nullptr;
152 if (pending_data != nullptr && pending_data->Length() > 0) {
153 buf = grpc_raw_byte_buffer_create(nullptr, 0);
154 grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
155 pending_data->c_slice_buffer());
156 pending_data->Clear();
157 }
158 acceptor->pending_data = buf;
159 s->on_accept_cb(s->on_accept_cb_arg,
160 grpc_event_engine::experimental::
161 grpc_event_engine_endpoint_create(std::move(ep)),
162 read_notifier_pollset, acceptor);
163 };
164 PosixEventEngineWithFdSupport* engine_ptr =
165 reinterpret_cast<PosixEventEngineWithFdSupport*>(
166 config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
167 // Keeps the engine alive for some tests that have not otherwise
168 // instantiated an EventEngine
169 std::shared_ptr<EventEngine> keeper;
170 if (engine_ptr == nullptr) {
171 keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
172 engine_ptr =
173 reinterpret_cast<PosixEventEngineWithFdSupport*>(keeper.get());
174 }
175 listener = engine_ptr->CreatePosixListener(
176 std::move(accept_cb),
177 [s, shutdown_complete](absl::Status status) {
178 grpc_event_engine::experimental::RunEventEngineClosure(
179 shutdown_complete, absl_status_to_grpc_error(status));
180 delete s->fd_handler;
181 delete s;
182 },
183 config,
184 std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
185 s->memory_quota));
186 } else {
187 EventEngine::Listener::AcceptCallback accept_cb =
188 [s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
189 s->on_accept_cb(s->on_accept_cb_arg,
190 grpc_event_engine::experimental::
191 grpc_event_engine_endpoint_create(std::move(ep)),
192 nullptr, nullptr);
193 };
194 auto ee = grpc_event_engine::experimental::GetDefaultEventEngine();
195 listener = ee->CreateListener(
196 std::move(accept_cb),
197 [s, ee, shutdown_complete](absl::Status status) {
198 GPR_ASSERT(gpr_atm_no_barrier_load(&s->refs.count) == 0);
199 grpc_event_engine::experimental::RunEventEngineClosure(
200 shutdown_complete, absl_status_to_grpc_error(status));
201 delete s->fd_handler;
202 delete s;
203 },
204 config,
205 std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
206 s->memory_quota));
207 }
208 if (!listener.ok()) {
209 delete s;
210 *server = nullptr;
211 return listener.status();
212 }
213 s->ee_listener = std::move(*listener);
214 return absl::OkStatus();
215 }
216
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)217 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
218 const EndpointConfig& config,
219 grpc_tcp_server_cb on_accept_cb,
220 void* on_accept_cb_arg,
221 grpc_tcp_server** server) {
222 grpc_tcp_server* s = new grpc_tcp_server;
223 s->so_reuseport = grpc_is_socket_reuse_port_supported();
224 s->expand_wildcard_addrs = false;
225 auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
226 if (value.has_value()) {
227 s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0);
228 }
229 value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
230 if (value.has_value()) {
231 s->expand_wildcard_addrs = (*value != 0);
232 }
233 gpr_ref_init(&s->refs, 1);
234 gpr_mu_init(&s->mu);
235 s->active_ports = 0;
236 s->destroyed_ports = 0;
237 s->shutdown = false;
238 s->shutdown_starting.head = nullptr;
239 s->shutdown_starting.tail = nullptr;
240 if (!grpc_event_engine::experimental::UseEventEngineListener()) {
241 s->shutdown_complete = shutdown_complete;
242 } else {
243 s->shutdown_complete = nullptr;
244 }
245 s->on_accept_cb = on_accept_cb;
246 s->on_accept_cb_arg = on_accept_cb_arg;
247 s->head = nullptr;
248 s->tail = nullptr;
249 s->nports = 0;
250 s->options = TcpOptionsFromEndpointConfig(config);
251 s->fd_handler = nullptr;
252 GPR_ASSERT(s->options.resource_quota != nullptr);
253 GPR_ASSERT(s->on_accept_cb);
254 s->memory_quota = s->options.resource_quota->memory_quota();
255 s->pre_allocated_fd = -1;
256 gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
257 s->n_bind_ports = 0;
258 new (&s->listen_fd_to_index_map)
259 absl::flat_hash_map<int, std::tuple<int, int>>();
260 *server = s;
261 if (grpc_event_engine::experimental::UseEventEngineListener()) {
262 return CreateEventEngineListener(s, shutdown_complete, config, server);
263 }
264 return absl::OkStatus();
265 }
266
finish_shutdown(grpc_tcp_server * s)267 static void finish_shutdown(grpc_tcp_server* s) {
268 gpr_mu_lock(&s->mu);
269 GPR_ASSERT(s->shutdown);
270 gpr_mu_unlock(&s->mu);
271 if (s->shutdown_complete != nullptr) {
272 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
273 absl::OkStatus());
274 }
275 gpr_mu_destroy(&s->mu);
276 while (s->head) {
277 grpc_tcp_listener* sp = s->head;
278 s->head = sp->next;
279 gpr_free(sp);
280 }
281 if (grpc_event_engine::experimental::UseEventEngineListener()) {
282 // This will trigger asynchronous execution of the on_shutdown_complete
283 // callback when appropriate. That callback will delete the server
284 s->ee_listener.reset();
285 } else {
286 delete s->fd_handler;
287 delete s;
288 }
289 }
290
destroyed_port(void * server,grpc_error_handle)291 static void destroyed_port(void* server, grpc_error_handle /*error*/) {
292 grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
293 gpr_mu_lock(&s->mu);
294 s->destroyed_ports++;
295 if (s->destroyed_ports == s->nports) {
296 gpr_mu_unlock(&s->mu);
297 finish_shutdown(s);
298 } else {
299 GPR_ASSERT(s->destroyed_ports < s->nports);
300 gpr_mu_unlock(&s->mu);
301 }
302 }
303
304 // called when all listening endpoints have been shutdown, so no further
305 // events will be received on them - at this point it's safe to destroy
306 // things
deactivated_all_ports(grpc_tcp_server * s)307 static void deactivated_all_ports(grpc_tcp_server* s) {
308 // delete ALL the things
309 gpr_mu_lock(&s->mu);
310
311 GPR_ASSERT(s->shutdown);
312
313 if (s->head) {
314 grpc_tcp_listener* sp;
315 for (sp = s->head; sp; sp = sp->next) {
316 // Do not unlink if there is a pre-allocated FD
317 if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
318 grpc_unlink_if_unix_domain_socket(&sp->addr);
319 }
320 GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
321 grpc_schedule_on_exec_ctx);
322 grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
323 "tcp_listener_shutdown");
324 }
325 gpr_mu_unlock(&s->mu);
326 } else {
327 gpr_mu_unlock(&s->mu);
328 finish_shutdown(s);
329 }
330 }
331
tcp_server_destroy(grpc_tcp_server * s)332 static void tcp_server_destroy(grpc_tcp_server* s) {
333 gpr_mu_lock(&s->mu);
334 GPR_ASSERT(!s->shutdown);
335 s->shutdown = true;
336 // shutdown all fd's
337 if (s->active_ports) {
338 grpc_tcp_listener* sp;
339 for (sp = s->head; sp; sp = sp->next) {
340 grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server destroyed"));
341 }
342 gpr_mu_unlock(&s->mu);
343 } else {
344 gpr_mu_unlock(&s->mu);
345 deactivated_all_ports(s);
346 }
347 }
348
349 // event manager callback when reads are ready
on_read(void * arg,grpc_error_handle err)350 static void on_read(void* arg, grpc_error_handle err) {
351 grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
352 grpc_pollset* read_notifier_pollset;
353 if (!err.ok()) {
354 goto error;
355 }
356
357 // loop until accept4 returns EAGAIN, and then re-arm notification
358 for (;;) {
359 grpc_resolved_address addr;
360 memset(&addr, 0, sizeof(addr));
361 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
362 // Note: If we ever decide to return this address to the user, remember to
363 // strip off the ::ffff:0.0.0.0/96 prefix first.
364 int fd = grpc_accept4(sp->fd, &addr, 1, 1);
365 if (fd < 0) {
366 if (errno == EINTR) {
367 continue;
368 }
369 // When the process runs out of fds, accept4() returns EMFILE. When this
370 // happens, the connection is left in the accept queue until either a
371 // read event triggers the on_read callback, or time has passed and the
372 // accept should be re-tried regardless. This callback is not cancelled,
373 // so a spurious wakeup may occur even when there's nothing to accept.
374 // This is not a performant code path, but if an fd limit has been
375 // reached, the system is likely in an unhappy state regardless.
376 if (errno == EMFILE) {
377 GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
378 "File descriptor limit reached. Retrying.");
379 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
380 if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
381 grpc_timer_init(&sp->retry_timer,
382 grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
383 &sp->retry_closure);
384 return;
385 }
386 if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
387 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
388 return;
389 }
390 gpr_mu_lock(&sp->server->mu);
391 if (!sp->server->shutdown_listeners) {
392 gpr_log(GPR_ERROR, "Failed accept4: %s",
393 grpc_core::StrError(errno).c_str());
394 } else {
395 // if we have shutdown listeners, accept4 could fail, and we
396 // needn't notify users
397 }
398 gpr_mu_unlock(&sp->server->mu);
399 goto error;
400 }
401
402 if (sp->server->memory_quota->IsMemoryPressureHigh()) {
403 int64_t dropped_connections_count =
404 num_dropped_connections.fetch_add(1, std::memory_order_relaxed) + 1;
405 if (dropped_connections_count % 1000 == 1) {
406 gpr_log(GPR_INFO,
407 "Dropped >= %" PRId64
408 " new connection attempts due to high memory pressure",
409 dropped_connections_count);
410 }
411 close(fd);
412 continue;
413 }
414
415 // For UNIX sockets, the accept call might not fill up the member sun_path
416 // of sockaddr_un, so explicitly call getpeername to get it.
417 if (grpc_is_unix_socket(&addr)) {
418 memset(&addr, 0, sizeof(addr));
419 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
420 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
421 &(addr.len)) < 0) {
422 gpr_log(GPR_ERROR, "Failed getpeername: %s",
423 grpc_core::StrError(errno).c_str());
424 close(fd);
425 goto error;
426 }
427 }
428
429 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
430
431 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
432 sp->server->options);
433 if (!err.ok()) {
434 goto error;
435 }
436
437 auto addr_uri = grpc_sockaddr_to_uri(&addr);
438 if (!addr_uri.ok()) {
439 gpr_log(GPR_ERROR, "Invalid address: %s",
440 addr_uri.status().ToString().c_str());
441 goto error;
442 }
443 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
444 gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s",
445 addr_uri->c_str());
446 }
447
448 std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
449 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
450
451 read_notifier_pollset = (*(sp->server->pollsets))
452 [static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
453 &sp->server->next_pollset_to_assign, 1)) %
454 sp->server->pollsets->size()];
455
456 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
457
458 // Create acceptor.
459 grpc_tcp_server_acceptor* acceptor =
460 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
461 acceptor->from_server = sp->server;
462 acceptor->port_index = sp->port_index;
463 acceptor->fd_index = sp->fd_index;
464 acceptor->external_connection = false;
465 sp->server->on_accept_cb(
466 sp->server->on_accept_cb_arg,
467 grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()),
468 read_notifier_pollset, acceptor);
469 }
470
471 GPR_UNREACHABLE_CODE(return);
472
473 error:
474 gpr_mu_lock(&sp->server->mu);
475 if (0 == --sp->server->active_ports && sp->server->shutdown) {
476 gpr_mu_unlock(&sp->server->mu);
477 deactivated_all_ports(sp->server);
478 } else {
479 gpr_mu_unlock(&sp->server->mu);
480 }
481 }
482
483 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)484 static grpc_error_handle add_wildcard_addrs_to_server(grpc_tcp_server* s,
485 unsigned port_index,
486 int requested_port,
487 int* out_port) {
488 grpc_resolved_address wild4;
489 grpc_resolved_address wild6;
490 unsigned fd_index = 0;
491 grpc_dualstack_mode dsmode;
492 grpc_tcp_listener* sp = nullptr;
493 grpc_tcp_listener* sp2 = nullptr;
494 grpc_error_handle v6_err;
495 grpc_error_handle v4_err;
496 *out_port = -1;
497
498 if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
499 return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
500 out_port);
501 }
502
503 grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
504 // Try listening on IPv6 first.
505 if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
506 &dsmode, &sp)) == absl::OkStatus()) {
507 ++fd_index;
508 requested_port = *out_port = sp->port;
509 if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
510 return absl::OkStatus();
511 }
512 }
513 // If we got a v6-only socket or nothing, try adding 0.0.0.0.
514 grpc_sockaddr_set_port(&wild4, requested_port);
515 if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
516 &dsmode, &sp2)) == absl::OkStatus()) {
517 *out_port = sp2->port;
518 if (sp != nullptr) {
519 sp2->is_sibling = 1;
520 sp->sibling = sp2;
521 }
522 }
523 if (*out_port > 0) {
524 if (!v6_err.ok()) {
525 gpr_log(GPR_INFO,
526 "Failed to add :: listener, "
527 "the environment may not support IPv6: %s",
528 grpc_core::StatusToString(v6_err).c_str());
529 }
530 if (!v4_err.ok()) {
531 gpr_log(GPR_INFO,
532 "Failed to add 0.0.0.0 listener, "
533 "the environment may not support IPv4: %s",
534 grpc_core::StatusToString(v4_err).c_str());
535 }
536 return absl::OkStatus();
537 } else {
538 grpc_error_handle root_err =
539 GRPC_ERROR_CREATE("Failed to add any wildcard listeners");
540 GPR_ASSERT(!v6_err.ok() && !v4_err.ok());
541 root_err = grpc_error_add_child(root_err, v6_err);
542 root_err = grpc_error_add_child(root_err, v4_err);
543 return root_err;
544 }
545 }
546
clone_port(grpc_tcp_listener * listener,unsigned count)547 static grpc_error_handle clone_port(grpc_tcp_listener* listener,
548 unsigned count) {
549 grpc_tcp_listener* sp = nullptr;
550 absl::StatusOr<std::string> addr_str;
551 grpc_error_handle err;
552
553 for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
554 l->fd_index += count;
555 }
556
557 for (unsigned i = 0; i < count; i++) {
558 int fd = -1;
559 int port = -1;
560 grpc_dualstack_mode dsmode;
561 err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
562 &fd);
563 if (!err.ok()) return err;
564 err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
565 true, &port);
566 if (!err.ok()) return err;
567 listener->server->nports++;
568 addr_str = grpc_sockaddr_to_string(&listener->addr, true);
569 if (!addr_str.ok()) {
570 return GRPC_ERROR_CREATE(addr_str.status().ToString());
571 }
572 sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
573 sp->next = listener->next;
574 listener->next = sp;
575 // sp (the new listener) is a sibling of 'listener' (the original
576 // listener).
577 sp->is_sibling = 1;
578 sp->sibling = listener->sibling;
579 listener->sibling = sp;
580 sp->server = listener->server;
581 sp->fd = fd;
582 sp->emfd = grpc_fd_create(
583 fd,
584 absl::StrFormat("tcp-server-listener:%s/clone-%d", *addr_str, i)
585 .c_str(),
586 true);
587 memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
588 sp->port = port;
589 sp->port_index = listener->port_index;
590 sp->fd_index = listener->fd_index + count - i;
591 GPR_ASSERT(sp->emfd);
592 grpc_tcp_server_listener_initialize_retry_timer(sp);
593 while (listener->server->tail->next != nullptr) {
594 listener->server->tail = listener->server->tail->next;
595 }
596 }
597
598 return absl::OkStatus();
599 }
600
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)601 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
602 const grpc_resolved_address* addr,
603 int* out_port) {
604 if (grpc_event_engine::experimental::UseEventEngineListener()) {
605 gpr_mu_lock(&s->mu);
606 if (s->shutdown_listeners) {
607 gpr_mu_unlock(&s->mu);
608 return absl::UnknownError("Server already shutdown");
609 }
610 int fd_index = 0;
611 absl::StatusOr<int> port;
612 if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
613 port =
614 static_cast<
615 grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
616 s->ee_listener.get())
617 ->BindWithFd(
618 grpc_event_engine::experimental::CreateResolvedAddress(*addr),
619 [s, &fd_index](absl::StatusOr<int> listen_fd) {
620 if (!listen_fd.ok()) {
621 return;
622 }
623 GPR_DEBUG_ASSERT(*listen_fd > 0);
624 s->listen_fd_to_index_map.insert_or_assign(
625 *listen_fd,
626 std::make_tuple(s->n_bind_ports, fd_index++));
627 });
628 } else {
629 port = s->ee_listener->Bind(
630 grpc_event_engine::experimental::CreateResolvedAddress(*addr));
631 }
632 if (port.ok()) {
633 s->n_bind_ports++;
634 *out_port = *port;
635 }
636 gpr_mu_unlock(&s->mu);
637 return port.status();
638 }
639 GPR_ASSERT(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
640 grpc_tcp_listener* sp;
641 grpc_resolved_address sockname_temp;
642 grpc_resolved_address addr6_v4mapped;
643 int requested_port = grpc_sockaddr_get_port(addr);
644 unsigned port_index = 0;
645 grpc_dualstack_mode dsmode;
646 grpc_error_handle err;
647 *out_port = -1;
648 if (s->tail != nullptr) {
649 port_index = s->tail->port_index + 1;
650 }
651
652 // Check if this is a wildcard port, and if so, try to keep the port the same
653 // as some previously created listener.
654 if (requested_port == 0) {
655 for (sp = s->head; sp; sp = sp->next) {
656 sockname_temp.len =
657 static_cast<socklen_t>(sizeof(struct sockaddr_storage));
658 if (0 ==
659 getsockname(sp->fd,
660 reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
661 &sockname_temp.len)) {
662 int used_port = grpc_sockaddr_get_port(&sockname_temp);
663 if (used_port > 0) {
664 memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
665 grpc_sockaddr_set_port(&sockname_temp, used_port);
666 requested_port = used_port;
667 addr = &sockname_temp;
668 break;
669 }
670 }
671 }
672 }
673
674 /* Check if systemd has pre-allocated valid FDs */
675 set_matching_sd_fds(s, addr, requested_port);
676
677 /* Do not unlink if there are pre-allocated FDs, or it will stop
678 working after the first client connects */
679 if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
680 grpc_unlink_if_unix_domain_socket(addr);
681 }
682
683 if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
684 return add_wildcard_addrs_to_server(s, port_index, requested_port,
685 out_port);
686 }
687 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
688 addr = &addr6_v4mapped;
689 }
690 if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
691 absl::OkStatus()) {
692 *out_port = sp->port;
693 }
694 return err;
695 }
696
697 // Return listener at port_index or NULL. Should only be called with s->mu
698 // locked.
get_port_index(grpc_tcp_server * s,unsigned port_index)699 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
700 unsigned port_index) {
701 unsigned num_ports = 0;
702 grpc_tcp_listener* sp;
703 for (sp = s->head; sp; sp = sp->next) {
704 if (!sp->is_sibling) {
705 if (++num_ports > port_index) {
706 return sp;
707 }
708 }
709 }
710 return nullptr;
711 }
712
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)713 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
714 unsigned num_fds = 0;
715 gpr_mu_lock(&s->mu);
716 if (grpc_event_engine::experimental::UseEventEngineListener()) {
717 // This doesn't need to be very fast. Used in tests.
718 for (auto it = s->listen_fd_to_index_map.begin();
719 it != s->listen_fd_to_index_map.end(); it++) {
720 if (std::get<0>(it->second) == static_cast<int>(port_index)) {
721 num_fds++;
722 }
723 }
724 gpr_mu_unlock(&s->mu);
725 return num_fds;
726 }
727 grpc_tcp_listener* sp = get_port_index(s, port_index);
728 for (; sp; sp = sp->sibling) {
729 ++num_fds;
730 }
731 gpr_mu_unlock(&s->mu);
732 return num_fds;
733 }
734
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)735 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
736 unsigned fd_index) {
737 gpr_mu_lock(&s->mu);
738 if (grpc_event_engine::experimental::UseEventEngineListener()) {
739 // This doesn't need to be very fast. Used in tests.
740 for (auto it = s->listen_fd_to_index_map.begin();
741 it != s->listen_fd_to_index_map.end(); it++) {
742 if (std::get<0>(it->second) == static_cast<int>(port_index) &&
743 std::get<1>(it->second) == static_cast<int>(fd_index)) {
744 gpr_mu_unlock(&s->mu);
745 return it->first;
746 }
747 }
748 gpr_mu_unlock(&s->mu);
749 return -1;
750 }
751 grpc_tcp_listener* sp = get_port_index(s, port_index);
752 for (; sp; sp = sp->sibling, --fd_index) {
753 if (fd_index == 0) {
754 gpr_mu_unlock(&s->mu);
755 return sp->fd;
756 }
757 }
758 gpr_mu_unlock(&s->mu);
759 return -1;
760 }
761
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > * pollsets)762 static void tcp_server_start(grpc_tcp_server* s,
763 const std::vector<grpc_pollset*>* pollsets) {
764 size_t i;
765 grpc_tcp_listener* sp;
766 gpr_mu_lock(&s->mu);
767 GPR_ASSERT(s->on_accept_cb);
768 GPR_ASSERT(s->active_ports == 0);
769 s->pollsets = pollsets;
770 if (grpc_event_engine::experimental::UseEventEngineListener()) {
771 GPR_ASSERT(!s->shutdown_listeners);
772 GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start()));
773 gpr_mu_unlock(&s->mu);
774 return;
775 }
776 sp = s->head;
777 while (sp != nullptr) {
778 if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
779 pollsets->size() > 1) {
780 GPR_ASSERT(GRPC_LOG_IF_ERROR(
781 "clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
782 for (i = 0; i < pollsets->size(); i++) {
783 grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
784 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
785 grpc_schedule_on_exec_ctx);
786 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
787 s->active_ports++;
788 sp = sp->next;
789 }
790 } else {
791 for (i = 0; i < pollsets->size(); i++) {
792 grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
793 }
794 GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
795 grpc_schedule_on_exec_ctx);
796 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
797 s->active_ports++;
798 sp = sp->next;
799 }
800 }
801 gpr_mu_unlock(&s->mu);
802 }
803
tcp_server_ref(grpc_tcp_server * s)804 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
805 gpr_ref_non_zero(&s->refs);
806 return s;
807 }
808
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)809 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
810 grpc_closure* shutdown_starting) {
811 gpr_mu_lock(&s->mu);
812 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
813 absl::OkStatus());
814 gpr_mu_unlock(&s->mu);
815 }
816
tcp_server_unref(grpc_tcp_server * s)817 static void tcp_server_unref(grpc_tcp_server* s) {
818 if (gpr_unref(&s->refs)) {
819 grpc_tcp_server_shutdown_listeners(s);
820 gpr_mu_lock(&s->mu);
821 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
822 gpr_mu_unlock(&s->mu);
823 tcp_server_destroy(s);
824 }
825 }
826
tcp_server_shutdown_listeners(grpc_tcp_server * s)827 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
828 gpr_mu_lock(&s->mu);
829 s->shutdown_listeners = true;
830 if (grpc_event_engine::experimental::UseEventEngineListener()) {
831 if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
832 static_cast<grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
833 s->ee_listener.get())
834 ->ShutdownListeningFds();
835 }
836 }
837 /* shutdown all fd's */
838 if (s->active_ports) {
839 grpc_tcp_listener* sp;
840 for (sp = s->head; sp; sp = sp->next) {
841 grpc_timer_cancel(&sp->retry_timer);
842 grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
843 }
844 }
845 gpr_mu_unlock(&s->mu);
846 }
847
tcp_server_pre_allocated_fd(grpc_tcp_server * s)848 static int tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
849 return s->pre_allocated_fd;
850 }
851
tcp_server_set_pre_allocated_fd(grpc_tcp_server * s,int fd)852 static void tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
853 gpr_mu_lock(&s->mu);
854 s->pre_allocated_fd = fd;
855 gpr_mu_unlock(&s->mu);
856 }
857
858 namespace {
859 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
860 public:
ExternalConnectionHandler(grpc_tcp_server * s)861 explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
862
863 // TODO(yangg) resolve duplicate code with on_read
Handle(int listener_fd,int fd,grpc_byte_buffer * buf)864 void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
865 if (grpc_event_engine::experimental::UseEventEngineListener()) {
866 GPR_ASSERT(grpc_event_engine::experimental::EventEngineSupportsFd());
867 grpc_event_engine::experimental::SliceBuffer pending_data;
868 if (buf != nullptr) {
869 pending_data =
870 grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
871 buf->data.raw.slice_buffer);
872 }
873 GPR_ASSERT(GRPC_LOG_IF_ERROR(
874 "listener_handle_external_connection",
875 static_cast<
876 grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
877 s_->ee_listener.get())
878 ->HandleExternalConnection(listener_fd, fd, &pending_data)));
879 return;
880 }
881 grpc_pollset* read_notifier_pollset;
882 grpc_resolved_address addr;
883 memset(&addr, 0, sizeof(addr));
884 addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
885 grpc_core::ExecCtx exec_ctx;
886
887 if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
888 &(addr.len)) < 0) {
889 gpr_log(GPR_ERROR, "Failed getpeername: %s",
890 grpc_core::StrError(errno).c_str());
891 close(fd);
892 return;
893 }
894 (void)grpc_set_socket_no_sigpipe_if_possible(fd);
895 auto addr_uri = grpc_sockaddr_to_uri(&addr);
896 if (!addr_uri.ok()) {
897 gpr_log(GPR_ERROR, "Invalid address: %s",
898 addr_uri.status().ToString().c_str());
899 return;
900 }
901 if (grpc_tcp_trace.enabled()) {
902 gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
903 addr_uri->c_str());
904 }
905 std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
906 grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
907 read_notifier_pollset =
908 (*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
909 &s_->next_pollset_to_assign, 1)) %
910 s_->pollsets->size()];
911 grpc_pollset_add_fd(read_notifier_pollset, fdobj);
912 grpc_tcp_server_acceptor* acceptor =
913 static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
914 acceptor->from_server = s_;
915 acceptor->port_index = -1;
916 acceptor->fd_index = -1;
917 acceptor->external_connection = true;
918 acceptor->listener_fd = listener_fd;
919 acceptor->pending_data = buf;
920 s_->on_accept_cb(s_->on_accept_cb_arg,
921 grpc_tcp_create(fdobj, s_->options, addr_uri.value()),
922 read_notifier_pollset, acceptor);
923 }
924
925 private:
926 grpc_tcp_server* s_;
927 };
928 } // namespace
929
tcp_server_create_fd_handler(grpc_tcp_server * s)930 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
931 grpc_tcp_server* s) {
932 s->fd_handler = new ExternalConnectionHandler(s);
933 return s->fd_handler;
934 }
935
936 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
937 tcp_server_create,
938 tcp_server_start,
939 tcp_server_add_port,
940 tcp_server_create_fd_handler,
941 tcp_server_port_fd_count,
942 tcp_server_port_fd,
943 tcp_server_ref,
944 tcp_server_shutdown_starting_add,
945 tcp_server_unref,
946 tcp_server_shutdown_listeners,
947 tcp_server_pre_allocated_fd,
948 tcp_server_set_pre_allocated_fd};
949
950 #endif // GRPC_POSIX_SOCKET_TCP_SERVER
951