1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <utility>
22 
23 #include <grpc/support/atm.h>
24 
25 // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE
26 #ifndef _GNU_SOURCE
27 #define _GNU_SOURCE
28 #include <grpc/event_engine/event_engine.h>
29 #endif
30 
31 #include "src/core/lib/iomgr/port.h"
32 
33 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
34 
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <string.h>
41 #include <sys/socket.h>
42 #include <sys/stat.h>
43 #include <sys/types.h>
44 #include <unistd.h>
45 
46 #include <string>
47 
48 #include "absl/strings/str_cat.h"
49 #include "absl/strings/str_format.h"
50 
51 #include <grpc/byte_buffer.h>
52 #include <grpc/event_engine/endpoint_config.h>
53 #include <grpc/support/alloc.h>
54 #include <grpc/support/log.h>
55 #include <grpc/support/sync.h>
56 #include <grpc/support/time.h>
57 
58 #include "src/core/lib/address_utils/sockaddr_utils.h"
59 #include "src/core/lib/event_engine/default_event_engine.h"
60 #include "src/core/lib/event_engine/memory_allocator_factory.h"
61 #include "src/core/lib/event_engine/resolved_address_internal.h"
62 #include "src/core/lib/event_engine/shim.h"
63 #include "src/core/lib/event_engine/tcp_socket_utils.h"
64 #include "src/core/lib/gpr/string.h"
65 #include "src/core/lib/gprpp/crash.h"
66 #include "src/core/lib/gprpp/memory.h"
67 #include "src/core/lib/gprpp/strerror.h"
68 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
69 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
70 #include "src/core/lib/iomgr/exec_ctx.h"
71 #include "src/core/lib/iomgr/resolve_address.h"
72 #include "src/core/lib/iomgr/sockaddr.h"
73 #include "src/core/lib/iomgr/socket_utils_posix.h"
74 #include "src/core/lib/iomgr/systemd_utils.h"
75 #include "src/core/lib/iomgr/tcp_posix.h"
76 #include "src/core/lib/iomgr/tcp_server.h"
77 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
78 #include "src/core/lib/iomgr/unix_sockets_posix.h"
79 #include "src/core/lib/resource_quota/api.h"
80 #include "src/core/lib/transport/error_utils.h"
81 
82 static std::atomic<int64_t> num_dropped_connections{0};
83 static constexpr grpc_core::Duration kRetryAcceptWaitTime{
84     grpc_core::Duration::Seconds(1)};
85 
86 using ::grpc_event_engine::experimental::EndpointConfig;
87 using ::grpc_event_engine::experimental::EventEngine;
88 using ::grpc_event_engine::experimental::MemoryAllocator;
89 using ::grpc_event_engine::experimental::MemoryQuotaBasedMemoryAllocatorFactory;
90 using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
91 using ::grpc_event_engine::experimental::SliceBuffer;
92 
CreateEventEngineListener(grpc_tcp_server * s,grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server ** server)93 static grpc_error_handle CreateEventEngineListener(
94     grpc_tcp_server* s, grpc_closure* shutdown_complete,
95     const EndpointConfig& config, grpc_tcp_server** server) {
96   absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener;
97   if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
98     PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
99         [s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
100             bool is_external, MemoryAllocator /*allocator*/,
101             SliceBuffer* pending_data) {
102           grpc_core::ApplicationCallbackExecCtx app_ctx;
103           grpc_core::ExecCtx exec_ctx;
104           grpc_tcp_server_acceptor* acceptor =
105               static_cast<grpc_tcp_server_acceptor*>(
106                   gpr_malloc(sizeof(*acceptor)));
107           acceptor->from_server = s;
108           acceptor->port_index = -1;
109           acceptor->fd_index = -1;
110           if (!is_external) {
111             auto it = s->listen_fd_to_index_map.find(listener_fd);
112             if (it != s->listen_fd_to_index_map.end()) {
113               acceptor->port_index = std::get<0>(it->second);
114               acceptor->fd_index = std::get<1>(it->second);
115             }
116           } else {
117             // External connection handling.
118             grpc_resolved_address addr;
119             memset(&addr, 0, sizeof(addr));
120             addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
121             // Get the fd of the socket connected to peer.
122             int fd = reinterpret_cast<grpc_event_engine::experimental::
123                                           PosixEndpointWithFdSupport*>(ep.get())
124                          ->GetWrappedFd();
125             if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
126                             &(addr.len)) < 0) {
127               gpr_log(GPR_ERROR, "Failed getpeername: %s",
128                       grpc_core::StrError(errno).c_str());
129               close(fd);
130               return;
131             }
132             (void)grpc_set_socket_no_sigpipe_if_possible(fd);
133             auto addr_uri = grpc_sockaddr_to_uri(&addr);
134             if (!addr_uri.ok()) {
135               gpr_log(GPR_ERROR, "Invalid address: %s",
136                       addr_uri.status().ToString().c_str());
137               return;
138             }
139             if (grpc_tcp_trace.enabled()) {
140               gpr_log(GPR_INFO,
141                       "SERVER_CONNECT: incoming external connection: %s",
142                       addr_uri->c_str());
143             }
144           }
145           grpc_pollset* read_notifier_pollset =
146               (*(s->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
147                                    &s->next_pollset_to_assign, 1)) %
148                                s->pollsets->size()];
149           acceptor->external_connection = is_external;
150           acceptor->listener_fd = listener_fd;
151           grpc_byte_buffer* buf = nullptr;
152           if (pending_data != nullptr && pending_data->Length() > 0) {
153             buf = grpc_raw_byte_buffer_create(nullptr, 0);
154             grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
155                                    pending_data->c_slice_buffer());
156             pending_data->Clear();
157           }
158           acceptor->pending_data = buf;
159           s->on_accept_cb(s->on_accept_cb_arg,
160                           grpc_event_engine::experimental::
161                               grpc_event_engine_endpoint_create(std::move(ep)),
162                           read_notifier_pollset, acceptor);
163         };
164     PosixEventEngineWithFdSupport* engine_ptr =
165         reinterpret_cast<PosixEventEngineWithFdSupport*>(
166             config.GetVoidPointer(GRPC_INTERNAL_ARG_EVENT_ENGINE));
167     // Keeps the engine alive for some tests that have not otherwise
168     // instantiated an EventEngine
169     std::shared_ptr<EventEngine> keeper;
170     if (engine_ptr == nullptr) {
171       keeper = grpc_event_engine::experimental::GetDefaultEventEngine();
172       engine_ptr =
173           reinterpret_cast<PosixEventEngineWithFdSupport*>(keeper.get());
174     }
175     listener = engine_ptr->CreatePosixListener(
176         std::move(accept_cb),
177         [s, shutdown_complete](absl::Status status) {
178           grpc_event_engine::experimental::RunEventEngineClosure(
179               shutdown_complete, absl_status_to_grpc_error(status));
180           delete s->fd_handler;
181           delete s;
182         },
183         config,
184         std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
185             s->memory_quota));
186   } else {
187     EventEngine::Listener::AcceptCallback accept_cb =
188         [s](std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
189           s->on_accept_cb(s->on_accept_cb_arg,
190                           grpc_event_engine::experimental::
191                               grpc_event_engine_endpoint_create(std::move(ep)),
192                           nullptr, nullptr);
193         };
194     auto ee = grpc_event_engine::experimental::GetDefaultEventEngine();
195     listener = ee->CreateListener(
196         std::move(accept_cb),
197         [s, ee, shutdown_complete](absl::Status status) {
198           GPR_ASSERT(gpr_atm_no_barrier_load(&s->refs.count) == 0);
199           grpc_event_engine::experimental::RunEventEngineClosure(
200               shutdown_complete, absl_status_to_grpc_error(status));
201           delete s->fd_handler;
202           delete s;
203         },
204         config,
205         std::make_unique<MemoryQuotaBasedMemoryAllocatorFactory>(
206             s->memory_quota));
207   }
208   if (!listener.ok()) {
209     delete s;
210     *server = nullptr;
211     return listener.status();
212   }
213   s->ee_listener = std::move(*listener);
214   return absl::OkStatus();
215 }
216 
tcp_server_create(grpc_closure * shutdown_complete,const EndpointConfig & config,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg,grpc_tcp_server ** server)217 static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
218                                            const EndpointConfig& config,
219                                            grpc_tcp_server_cb on_accept_cb,
220                                            void* on_accept_cb_arg,
221                                            grpc_tcp_server** server) {
222   grpc_tcp_server* s = new grpc_tcp_server;
223   s->so_reuseport = grpc_is_socket_reuse_port_supported();
224   s->expand_wildcard_addrs = false;
225   auto value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
226   if (value.has_value()) {
227     s->so_reuseport = (grpc_is_socket_reuse_port_supported() && *value != 0);
228   }
229   value = config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
230   if (value.has_value()) {
231     s->expand_wildcard_addrs = (*value != 0);
232   }
233   gpr_ref_init(&s->refs, 1);
234   gpr_mu_init(&s->mu);
235   s->active_ports = 0;
236   s->destroyed_ports = 0;
237   s->shutdown = false;
238   s->shutdown_starting.head = nullptr;
239   s->shutdown_starting.tail = nullptr;
240   if (!grpc_event_engine::experimental::UseEventEngineListener()) {
241     s->shutdown_complete = shutdown_complete;
242   } else {
243     s->shutdown_complete = nullptr;
244   }
245   s->on_accept_cb = on_accept_cb;
246   s->on_accept_cb_arg = on_accept_cb_arg;
247   s->head = nullptr;
248   s->tail = nullptr;
249   s->nports = 0;
250   s->options = TcpOptionsFromEndpointConfig(config);
251   s->fd_handler = nullptr;
252   GPR_ASSERT(s->options.resource_quota != nullptr);
253   GPR_ASSERT(s->on_accept_cb);
254   s->memory_quota = s->options.resource_quota->memory_quota();
255   s->pre_allocated_fd = -1;
256   gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
257   s->n_bind_ports = 0;
258   new (&s->listen_fd_to_index_map)
259       absl::flat_hash_map<int, std::tuple<int, int>>();
260   *server = s;
261   if (grpc_event_engine::experimental::UseEventEngineListener()) {
262     return CreateEventEngineListener(s, shutdown_complete, config, server);
263   }
264   return absl::OkStatus();
265 }
266 
finish_shutdown(grpc_tcp_server * s)267 static void finish_shutdown(grpc_tcp_server* s) {
268   gpr_mu_lock(&s->mu);
269   GPR_ASSERT(s->shutdown);
270   gpr_mu_unlock(&s->mu);
271   if (s->shutdown_complete != nullptr) {
272     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
273                             absl::OkStatus());
274   }
275   gpr_mu_destroy(&s->mu);
276   while (s->head) {
277     grpc_tcp_listener* sp = s->head;
278     s->head = sp->next;
279     gpr_free(sp);
280   }
281   if (grpc_event_engine::experimental::UseEventEngineListener()) {
282     // This will trigger asynchronous execution of the on_shutdown_complete
283     // callback when appropriate. That callback will delete the server
284     s->ee_listener.reset();
285   } else {
286     delete s->fd_handler;
287     delete s;
288   }
289 }
290 
destroyed_port(void * server,grpc_error_handle)291 static void destroyed_port(void* server, grpc_error_handle /*error*/) {
292   grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
293   gpr_mu_lock(&s->mu);
294   s->destroyed_ports++;
295   if (s->destroyed_ports == s->nports) {
296     gpr_mu_unlock(&s->mu);
297     finish_shutdown(s);
298   } else {
299     GPR_ASSERT(s->destroyed_ports < s->nports);
300     gpr_mu_unlock(&s->mu);
301   }
302 }
303 
304 // called when all listening endpoints have been shutdown, so no further
305 // events will be received on them - at this point it's safe to destroy
306 // things
deactivated_all_ports(grpc_tcp_server * s)307 static void deactivated_all_ports(grpc_tcp_server* s) {
308   // delete ALL the things
309   gpr_mu_lock(&s->mu);
310 
311   GPR_ASSERT(s->shutdown);
312 
313   if (s->head) {
314     grpc_tcp_listener* sp;
315     for (sp = s->head; sp; sp = sp->next) {
316       // Do not unlink if there is a pre-allocated FD
317       if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
318         grpc_unlink_if_unix_domain_socket(&sp->addr);
319       }
320       GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
321                         grpc_schedule_on_exec_ctx);
322       grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
323                      "tcp_listener_shutdown");
324     }
325     gpr_mu_unlock(&s->mu);
326   } else {
327     gpr_mu_unlock(&s->mu);
328     finish_shutdown(s);
329   }
330 }
331 
tcp_server_destroy(grpc_tcp_server * s)332 static void tcp_server_destroy(grpc_tcp_server* s) {
333   gpr_mu_lock(&s->mu);
334   GPR_ASSERT(!s->shutdown);
335   s->shutdown = true;
336   // shutdown all fd's
337   if (s->active_ports) {
338     grpc_tcp_listener* sp;
339     for (sp = s->head; sp; sp = sp->next) {
340       grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server destroyed"));
341     }
342     gpr_mu_unlock(&s->mu);
343   } else {
344     gpr_mu_unlock(&s->mu);
345     deactivated_all_ports(s);
346   }
347 }
348 
349 // event manager callback when reads are ready
on_read(void * arg,grpc_error_handle err)350 static void on_read(void* arg, grpc_error_handle err) {
351   grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
352   grpc_pollset* read_notifier_pollset;
353   if (!err.ok()) {
354     goto error;
355   }
356 
357   // loop until accept4 returns EAGAIN, and then re-arm notification
358   for (;;) {
359     grpc_resolved_address addr;
360     memset(&addr, 0, sizeof(addr));
361     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
362     // Note: If we ever decide to return this address to the user, remember to
363     // strip off the ::ffff:0.0.0.0/96 prefix first.
364     int fd = grpc_accept4(sp->fd, &addr, 1, 1);
365     if (fd < 0) {
366       if (errno == EINTR) {
367         continue;
368       }
369       // When the process runs out of fds, accept4() returns EMFILE. When this
370       // happens, the connection is left in the accept queue until either a
371       // read event triggers the on_read callback, or time has passed and the
372       // accept should be re-tried regardless. This callback is not cancelled,
373       // so a spurious wakeup may occur even when there's nothing to accept.
374       // This is not a performant code path, but if an fd limit has been
375       // reached, the system is likely in an unhappy state regardless.
376       if (errno == EMFILE) {
377         GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s",
378                              "File descriptor limit reached. Retrying.");
379         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
380         if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
381         grpc_timer_init(&sp->retry_timer,
382                         grpc_core::Timestamp::Now() + kRetryAcceptWaitTime,
383                         &sp->retry_closure);
384         return;
385       }
386       if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
387         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
388         return;
389       }
390       gpr_mu_lock(&sp->server->mu);
391       if (!sp->server->shutdown_listeners) {
392         gpr_log(GPR_ERROR, "Failed accept4: %s",
393                 grpc_core::StrError(errno).c_str());
394       } else {
395         // if we have shutdown listeners, accept4 could fail, and we
396         // needn't notify users
397       }
398       gpr_mu_unlock(&sp->server->mu);
399       goto error;
400     }
401 
402     if (sp->server->memory_quota->IsMemoryPressureHigh()) {
403       int64_t dropped_connections_count =
404           num_dropped_connections.fetch_add(1, std::memory_order_relaxed) + 1;
405       if (dropped_connections_count % 1000 == 1) {
406         gpr_log(GPR_INFO,
407                 "Dropped >= %" PRId64
408                 " new connection attempts due to high memory pressure",
409                 dropped_connections_count);
410       }
411       close(fd);
412       continue;
413     }
414 
415     // For UNIX sockets, the accept call might not fill up the member sun_path
416     // of sockaddr_un, so explicitly call getpeername to get it.
417     if (grpc_is_unix_socket(&addr)) {
418       memset(&addr, 0, sizeof(addr));
419       addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
420       if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
421                       &(addr.len)) < 0) {
422         gpr_log(GPR_ERROR, "Failed getpeername: %s",
423                 grpc_core::StrError(errno).c_str());
424         close(fd);
425         goto error;
426       }
427     }
428 
429     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
430 
431     err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_CONNECTION_USAGE,
432                                             sp->server->options);
433     if (!err.ok()) {
434       goto error;
435     }
436 
437     auto addr_uri = grpc_sockaddr_to_uri(&addr);
438     if (!addr_uri.ok()) {
439       gpr_log(GPR_ERROR, "Invalid address: %s",
440               addr_uri.status().ToString().c_str());
441       goto error;
442     }
443     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
444       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s",
445               addr_uri->c_str());
446     }
447 
448     std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
449     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
450 
451     read_notifier_pollset = (*(sp->server->pollsets))
452         [static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
453              &sp->server->next_pollset_to_assign, 1)) %
454          sp->server->pollsets->size()];
455 
456     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
457 
458     // Create acceptor.
459     grpc_tcp_server_acceptor* acceptor =
460         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
461     acceptor->from_server = sp->server;
462     acceptor->port_index = sp->port_index;
463     acceptor->fd_index = sp->fd_index;
464     acceptor->external_connection = false;
465     sp->server->on_accept_cb(
466         sp->server->on_accept_cb_arg,
467         grpc_tcp_create(fdobj, sp->server->options, addr_uri.value()),
468         read_notifier_pollset, acceptor);
469   }
470 
471   GPR_UNREACHABLE_CODE(return);
472 
473 error:
474   gpr_mu_lock(&sp->server->mu);
475   if (0 == --sp->server->active_ports && sp->server->shutdown) {
476     gpr_mu_unlock(&sp->server->mu);
477     deactivated_all_ports(sp->server);
478   } else {
479     gpr_mu_unlock(&sp->server->mu);
480   }
481 }
482 
483 // Treat :: or 0.0.0.0 as a family-agnostic wildcard.
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)484 static grpc_error_handle add_wildcard_addrs_to_server(grpc_tcp_server* s,
485                                                       unsigned port_index,
486                                                       int requested_port,
487                                                       int* out_port) {
488   grpc_resolved_address wild4;
489   grpc_resolved_address wild6;
490   unsigned fd_index = 0;
491   grpc_dualstack_mode dsmode;
492   grpc_tcp_listener* sp = nullptr;
493   grpc_tcp_listener* sp2 = nullptr;
494   grpc_error_handle v6_err;
495   grpc_error_handle v4_err;
496   *out_port = -1;
497 
498   if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
499     return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
500                                                out_port);
501   }
502 
503   grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
504   // Try listening on IPv6 first.
505   if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
506                                          &dsmode, &sp)) == absl::OkStatus()) {
507     ++fd_index;
508     requested_port = *out_port = sp->port;
509     if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
510       return absl::OkStatus();
511     }
512   }
513   // If we got a v6-only socket or nothing, try adding 0.0.0.0.
514   grpc_sockaddr_set_port(&wild4, requested_port);
515   if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
516                                          &dsmode, &sp2)) == absl::OkStatus()) {
517     *out_port = sp2->port;
518     if (sp != nullptr) {
519       sp2->is_sibling = 1;
520       sp->sibling = sp2;
521     }
522   }
523   if (*out_port > 0) {
524     if (!v6_err.ok()) {
525       gpr_log(GPR_INFO,
526               "Failed to add :: listener, "
527               "the environment may not support IPv6: %s",
528               grpc_core::StatusToString(v6_err).c_str());
529     }
530     if (!v4_err.ok()) {
531       gpr_log(GPR_INFO,
532               "Failed to add 0.0.0.0 listener, "
533               "the environment may not support IPv4: %s",
534               grpc_core::StatusToString(v4_err).c_str());
535     }
536     return absl::OkStatus();
537   } else {
538     grpc_error_handle root_err =
539         GRPC_ERROR_CREATE("Failed to add any wildcard listeners");
540     GPR_ASSERT(!v6_err.ok() && !v4_err.ok());
541     root_err = grpc_error_add_child(root_err, v6_err);
542     root_err = grpc_error_add_child(root_err, v4_err);
543     return root_err;
544   }
545 }
546 
clone_port(grpc_tcp_listener * listener,unsigned count)547 static grpc_error_handle clone_port(grpc_tcp_listener* listener,
548                                     unsigned count) {
549   grpc_tcp_listener* sp = nullptr;
550   absl::StatusOr<std::string> addr_str;
551   grpc_error_handle err;
552 
553   for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
554     l->fd_index += count;
555   }
556 
557   for (unsigned i = 0; i < count; i++) {
558     int fd = -1;
559     int port = -1;
560     grpc_dualstack_mode dsmode;
561     err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
562                                        &fd);
563     if (!err.ok()) return err;
564     err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
565                                          true, &port);
566     if (!err.ok()) return err;
567     listener->server->nports++;
568     addr_str = grpc_sockaddr_to_string(&listener->addr, true);
569     if (!addr_str.ok()) {
570       return GRPC_ERROR_CREATE(addr_str.status().ToString());
571     }
572     sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
573     sp->next = listener->next;
574     listener->next = sp;
575     // sp (the new listener) is a sibling of 'listener' (the original
576     // listener).
577     sp->is_sibling = 1;
578     sp->sibling = listener->sibling;
579     listener->sibling = sp;
580     sp->server = listener->server;
581     sp->fd = fd;
582     sp->emfd = grpc_fd_create(
583         fd,
584         absl::StrFormat("tcp-server-listener:%s/clone-%d", *addr_str, i)
585             .c_str(),
586         true);
587     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
588     sp->port = port;
589     sp->port_index = listener->port_index;
590     sp->fd_index = listener->fd_index + count - i;
591     GPR_ASSERT(sp->emfd);
592     grpc_tcp_server_listener_initialize_retry_timer(sp);
593     while (listener->server->tail->next != nullptr) {
594       listener->server->tail = listener->server->tail->next;
595     }
596   }
597 
598   return absl::OkStatus();
599 }
600 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)601 static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
602                                              const grpc_resolved_address* addr,
603                                              int* out_port) {
604   if (grpc_event_engine::experimental::UseEventEngineListener()) {
605     gpr_mu_lock(&s->mu);
606     if (s->shutdown_listeners) {
607       gpr_mu_unlock(&s->mu);
608       return absl::UnknownError("Server already shutdown");
609     }
610     int fd_index = 0;
611     absl::StatusOr<int> port;
612     if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
613       port =
614           static_cast<
615               grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
616               s->ee_listener.get())
617               ->BindWithFd(
618                   grpc_event_engine::experimental::CreateResolvedAddress(*addr),
619                   [s, &fd_index](absl::StatusOr<int> listen_fd) {
620                     if (!listen_fd.ok()) {
621                       return;
622                     }
623                     GPR_DEBUG_ASSERT(*listen_fd > 0);
624                     s->listen_fd_to_index_map.insert_or_assign(
625                         *listen_fd,
626                         std::make_tuple(s->n_bind_ports, fd_index++));
627                   });
628     } else {
629       port = s->ee_listener->Bind(
630           grpc_event_engine::experimental::CreateResolvedAddress(*addr));
631     }
632     if (port.ok()) {
633       s->n_bind_ports++;
634       *out_port = *port;
635     }
636     gpr_mu_unlock(&s->mu);
637     return port.status();
638   }
639   GPR_ASSERT(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
640   grpc_tcp_listener* sp;
641   grpc_resolved_address sockname_temp;
642   grpc_resolved_address addr6_v4mapped;
643   int requested_port = grpc_sockaddr_get_port(addr);
644   unsigned port_index = 0;
645   grpc_dualstack_mode dsmode;
646   grpc_error_handle err;
647   *out_port = -1;
648   if (s->tail != nullptr) {
649     port_index = s->tail->port_index + 1;
650   }
651 
652   // Check if this is a wildcard port, and if so, try to keep the port the same
653   // as some previously created listener.
654   if (requested_port == 0) {
655     for (sp = s->head; sp; sp = sp->next) {
656       sockname_temp.len =
657           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
658       if (0 ==
659           getsockname(sp->fd,
660                       reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
661                       &sockname_temp.len)) {
662         int used_port = grpc_sockaddr_get_port(&sockname_temp);
663         if (used_port > 0) {
664           memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
665           grpc_sockaddr_set_port(&sockname_temp, used_port);
666           requested_port = used_port;
667           addr = &sockname_temp;
668           break;
669         }
670       }
671     }
672   }
673 
674   /* Check if systemd has pre-allocated valid FDs */
675   set_matching_sd_fds(s, addr, requested_port);
676 
677   /* Do not unlink if there are pre-allocated FDs, or it will stop
678      working after the first client connects */
679   if (grpc_tcp_server_pre_allocated_fd(s) <= 0) {
680     grpc_unlink_if_unix_domain_socket(addr);
681   }
682 
683   if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
684     return add_wildcard_addrs_to_server(s, port_index, requested_port,
685                                         out_port);
686   }
687   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
688     addr = &addr6_v4mapped;
689   }
690   if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
691       absl::OkStatus()) {
692     *out_port = sp->port;
693   }
694   return err;
695 }
696 
697 // Return listener at port_index or NULL. Should only be called with s->mu
698 // locked.
get_port_index(grpc_tcp_server * s,unsigned port_index)699 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
700                                          unsigned port_index) {
701   unsigned num_ports = 0;
702   grpc_tcp_listener* sp;
703   for (sp = s->head; sp; sp = sp->next) {
704     if (!sp->is_sibling) {
705       if (++num_ports > port_index) {
706         return sp;
707       }
708     }
709   }
710   return nullptr;
711 }
712 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)713 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
714   unsigned num_fds = 0;
715   gpr_mu_lock(&s->mu);
716   if (grpc_event_engine::experimental::UseEventEngineListener()) {
717     // This doesn't need to be very fast. Used in tests.
718     for (auto it = s->listen_fd_to_index_map.begin();
719          it != s->listen_fd_to_index_map.end(); it++) {
720       if (std::get<0>(it->second) == static_cast<int>(port_index)) {
721         num_fds++;
722       }
723     }
724     gpr_mu_unlock(&s->mu);
725     return num_fds;
726   }
727   grpc_tcp_listener* sp = get_port_index(s, port_index);
728   for (; sp; sp = sp->sibling) {
729     ++num_fds;
730   }
731   gpr_mu_unlock(&s->mu);
732   return num_fds;
733 }
734 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)735 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
736                               unsigned fd_index) {
737   gpr_mu_lock(&s->mu);
738   if (grpc_event_engine::experimental::UseEventEngineListener()) {
739     // This doesn't need to be very fast. Used in tests.
740     for (auto it = s->listen_fd_to_index_map.begin();
741          it != s->listen_fd_to_index_map.end(); it++) {
742       if (std::get<0>(it->second) == static_cast<int>(port_index) &&
743           std::get<1>(it->second) == static_cast<int>(fd_index)) {
744         gpr_mu_unlock(&s->mu);
745         return it->first;
746       }
747     }
748     gpr_mu_unlock(&s->mu);
749     return -1;
750   }
751   grpc_tcp_listener* sp = get_port_index(s, port_index);
752   for (; sp; sp = sp->sibling, --fd_index) {
753     if (fd_index == 0) {
754       gpr_mu_unlock(&s->mu);
755       return sp->fd;
756     }
757   }
758   gpr_mu_unlock(&s->mu);
759   return -1;
760 }
761 
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > * pollsets)762 static void tcp_server_start(grpc_tcp_server* s,
763                              const std::vector<grpc_pollset*>* pollsets) {
764   size_t i;
765   grpc_tcp_listener* sp;
766   gpr_mu_lock(&s->mu);
767   GPR_ASSERT(s->on_accept_cb);
768   GPR_ASSERT(s->active_ports == 0);
769   s->pollsets = pollsets;
770   if (grpc_event_engine::experimental::UseEventEngineListener()) {
771     GPR_ASSERT(!s->shutdown_listeners);
772     GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start()));
773     gpr_mu_unlock(&s->mu);
774     return;
775   }
776   sp = s->head;
777   while (sp != nullptr) {
778     if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
779         pollsets->size() > 1) {
780       GPR_ASSERT(GRPC_LOG_IF_ERROR(
781           "clone_port", clone_port(sp, (unsigned)(pollsets->size() - 1))));
782       for (i = 0; i < pollsets->size(); i++) {
783         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
784         GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
785                           grpc_schedule_on_exec_ctx);
786         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
787         s->active_ports++;
788         sp = sp->next;
789       }
790     } else {
791       for (i = 0; i < pollsets->size(); i++) {
792         grpc_pollset_add_fd((*pollsets)[i], sp->emfd);
793       }
794       GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
795                         grpc_schedule_on_exec_ctx);
796       grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
797       s->active_ports++;
798       sp = sp->next;
799     }
800   }
801   gpr_mu_unlock(&s->mu);
802 }
803 
tcp_server_ref(grpc_tcp_server * s)804 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
805   gpr_ref_non_zero(&s->refs);
806   return s;
807 }
808 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)809 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
810                                              grpc_closure* shutdown_starting) {
811   gpr_mu_lock(&s->mu);
812   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
813                            absl::OkStatus());
814   gpr_mu_unlock(&s->mu);
815 }
816 
tcp_server_unref(grpc_tcp_server * s)817 static void tcp_server_unref(grpc_tcp_server* s) {
818   if (gpr_unref(&s->refs)) {
819     grpc_tcp_server_shutdown_listeners(s);
820     gpr_mu_lock(&s->mu);
821     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
822     gpr_mu_unlock(&s->mu);
823     tcp_server_destroy(s);
824   }
825 }
826 
tcp_server_shutdown_listeners(grpc_tcp_server * s)827 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
828   gpr_mu_lock(&s->mu);
829   s->shutdown_listeners = true;
830   if (grpc_event_engine::experimental::UseEventEngineListener()) {
831     if (grpc_event_engine::experimental::EventEngineSupportsFd()) {
832       static_cast<grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
833           s->ee_listener.get())
834           ->ShutdownListeningFds();
835     }
836   }
837   /* shutdown all fd's */
838   if (s->active_ports) {
839     grpc_tcp_listener* sp;
840     for (sp = s->head; sp; sp = sp->next) {
841       grpc_timer_cancel(&sp->retry_timer);
842       grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown"));
843     }
844   }
845   gpr_mu_unlock(&s->mu);
846 }
847 
tcp_server_pre_allocated_fd(grpc_tcp_server * s)848 static int tcp_server_pre_allocated_fd(grpc_tcp_server* s) {
849   return s->pre_allocated_fd;
850 }
851 
tcp_server_set_pre_allocated_fd(grpc_tcp_server * s,int fd)852 static void tcp_server_set_pre_allocated_fd(grpc_tcp_server* s, int fd) {
853   gpr_mu_lock(&s->mu);
854   s->pre_allocated_fd = fd;
855   gpr_mu_unlock(&s->mu);
856 }
857 
858 namespace {
859 class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
860  public:
ExternalConnectionHandler(grpc_tcp_server * s)861   explicit ExternalConnectionHandler(grpc_tcp_server* s) : s_(s) {}
862 
863   // TODO(yangg) resolve duplicate code with on_read
Handle(int listener_fd,int fd,grpc_byte_buffer * buf)864   void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
865     if (grpc_event_engine::experimental::UseEventEngineListener()) {
866       GPR_ASSERT(grpc_event_engine::experimental::EventEngineSupportsFd());
867       grpc_event_engine::experimental::SliceBuffer pending_data;
868       if (buf != nullptr) {
869         pending_data =
870             grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
871                 buf->data.raw.slice_buffer);
872       }
873       GPR_ASSERT(GRPC_LOG_IF_ERROR(
874           "listener_handle_external_connection",
875           static_cast<
876               grpc_event_engine::experimental::PosixListenerWithFdSupport*>(
877               s_->ee_listener.get())
878               ->HandleExternalConnection(listener_fd, fd, &pending_data)));
879       return;
880     }
881     grpc_pollset* read_notifier_pollset;
882     grpc_resolved_address addr;
883     memset(&addr, 0, sizeof(addr));
884     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
885     grpc_core::ExecCtx exec_ctx;
886 
887     if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
888                     &(addr.len)) < 0) {
889       gpr_log(GPR_ERROR, "Failed getpeername: %s",
890               grpc_core::StrError(errno).c_str());
891       close(fd);
892       return;
893     }
894     (void)grpc_set_socket_no_sigpipe_if_possible(fd);
895     auto addr_uri = grpc_sockaddr_to_uri(&addr);
896     if (!addr_uri.ok()) {
897       gpr_log(GPR_ERROR, "Invalid address: %s",
898               addr_uri.status().ToString().c_str());
899       return;
900     }
901     if (grpc_tcp_trace.enabled()) {
902       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming external connection: %s",
903               addr_uri->c_str());
904     }
905     std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
906     grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
907     read_notifier_pollset =
908         (*(s_->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
909                               &s_->next_pollset_to_assign, 1)) %
910                           s_->pollsets->size()];
911     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
912     grpc_tcp_server_acceptor* acceptor =
913         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
914     acceptor->from_server = s_;
915     acceptor->port_index = -1;
916     acceptor->fd_index = -1;
917     acceptor->external_connection = true;
918     acceptor->listener_fd = listener_fd;
919     acceptor->pending_data = buf;
920     s_->on_accept_cb(s_->on_accept_cb_arg,
921                      grpc_tcp_create(fdobj, s_->options, addr_uri.value()),
922                      read_notifier_pollset, acceptor);
923   }
924 
925  private:
926   grpc_tcp_server* s_;
927 };
928 }  // namespace
929 
tcp_server_create_fd_handler(grpc_tcp_server * s)930 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
931     grpc_tcp_server* s) {
932   s->fd_handler = new ExternalConnectionHandler(s);
933   return s->fd_handler;
934 }
935 
936 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
937     tcp_server_create,
938     tcp_server_start,
939     tcp_server_add_port,
940     tcp_server_create_fd_handler,
941     tcp_server_port_fd_count,
942     tcp_server_port_fd,
943     tcp_server_ref,
944     tcp_server_shutdown_starting_add,
945     tcp_server_unref,
946     tcp_server_shutdown_listeners,
947     tcp_server_pre_allocated_fd,
948     tcp_server_set_pre_allocated_fd};
949 
950 #endif  // GRPC_POSIX_SOCKET_TCP_SERVER
951