1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22 
23 #include <inttypes.h>
24 #include <string.h>
25 
26 #include <algorithm>
27 #include <initializer_list>
28 #include <map>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/base/thread_annotations.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/strings/strip.h"
41 #include "absl/types/optional.h"
42 
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/grpc.h>
45 #include <grpc/grpc_posix.h>
46 #include <grpc/slice_buffer.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49 
50 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
51 #include "src/core/ext/transport/chttp2/transport/frame.h"
52 #include "src/core/ext/transport/chttp2/transport/internal.h"
53 #include "src/core/lib/address_utils/sockaddr_utils.h"
54 #include "src/core/lib/channel/channel_args.h"
55 #include "src/core/lib/channel/channelz.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/debug/trace.h"
58 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
59 #include "src/core/lib/gprpp/debug_location.h"
60 #include "src/core/lib/gprpp/orphanable.h"
61 #include "src/core/lib/gprpp/ref_counted_ptr.h"
62 #include "src/core/lib/gprpp/status_helper.h"
63 #include "src/core/lib/gprpp/sync.h"
64 #include "src/core/lib/gprpp/time.h"
65 #include "src/core/lib/gprpp/unique_type_name.h"
66 #include "src/core/lib/iomgr/closure.h"
67 #include "src/core/lib/iomgr/endpoint.h"
68 #include "src/core/lib/iomgr/iomgr_fwd.h"
69 #include "src/core/lib/iomgr/pollset_set.h"
70 #include "src/core/lib/iomgr/resolve_address.h"
71 #include "src/core/lib/iomgr/resolved_address.h"
72 #include "src/core/lib/iomgr/tcp_server.h"
73 #include "src/core/lib/iomgr/unix_sockets_posix.h"
74 #include "src/core/lib/resource_quota/memory_quota.h"
75 #include "src/core/lib/resource_quota/resource_quota.h"
76 #include "src/core/lib/security/credentials/credentials.h"
77 #include "src/core/lib/security/credentials/insecure/insecure_credentials.h"
78 #include "src/core/lib/security/security_connector/security_connector.h"
79 #include "src/core/lib/surface/api_trace.h"
80 #include "src/core/lib/surface/server.h"
81 #include "src/core/lib/transport/error_utils.h"
82 #include "src/core/lib/transport/handshaker.h"
83 #include "src/core/lib/transport/handshaker_registry.h"
84 #include "src/core/lib/transport/transport.h"
85 #include "src/core/lib/transport/transport_fwd.h"
86 #include "src/core/lib/uri/uri_parser.h"
87 
88 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
89 #include "src/core/lib/iomgr/ev_posix.h"
90 #include "src/core/lib/iomgr/exec_ctx.h"
91 #include "src/core/lib/iomgr/tcp_client_posix.h"
92 #endif  // GPR_SUPPORT_CHANNELS_FROM_FD
93 
94 namespace grpc_core {
95 namespace {
96 
97 using ::grpc_event_engine::experimental::EventEngine;
98 
99 const char kUnixUriPrefix[] = "unix:";
100 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
101 
102 class Chttp2ServerListener : public Server::ListenerInterface {
103  public:
104   static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
105                                   const ChannelArgs& args,
106                                   Chttp2ServerArgsModifier args_modifier,
107                                   int* port_num);
108 
109   static grpc_error_handle CreateWithAcceptor(
110       Server* server, const char* name, const ChannelArgs& args,
111       Chttp2ServerArgsModifier args_modifier);
112 
113   // Do not instantiate directly.  Use one of the factory methods above.
114   Chttp2ServerListener(Server* server, const ChannelArgs& args,
115                        Chttp2ServerArgsModifier args_modifier);
116   ~Chttp2ServerListener() override;
117 
118   void Start(Server* server,
119              const std::vector<grpc_pollset*>* pollsets) override;
120 
channelz_listen_socket_node() const121   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
122     return channelz_listen_socket_.get();
123   }
124 
125   void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
126 
127   void Orphan() override;
128 
129  private:
130   class ConfigFetcherWatcher
131       : public grpc_server_config_fetcher::WatcherInterface {
132    public:
ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)133     explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
134         : listener_(std::move(listener)) {}
135 
136     void UpdateConnectionManager(
137         RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
138             connection_manager) override;
139 
140     void StopServing() override;
141 
142    private:
143     RefCountedPtr<Chttp2ServerListener> listener_;
144   };
145 
146   class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
147    public:
148     class HandshakingState : public InternallyRefCounted<HandshakingState> {
149      public:
150       HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
151                        grpc_pollset* accepting_pollset,
152                        grpc_tcp_server_acceptor* acceptor,
153                        const ChannelArgs& args);
154 
155       ~HandshakingState() override;
156 
157       void Orphan() override;
158 
159       void Start(grpc_endpoint* endpoint, const ChannelArgs& args);
160 
161       // Needed to be able to grab an external ref in
162       // ActiveConnection::Start()
163       using InternallyRefCounted<HandshakingState>::Ref;
164 
165      private:
166       void OnTimeout() ABSL_LOCKS_EXCLUDED(&connection_->mu_);
167       static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
168       static void OnHandshakeDone(void* arg, grpc_error_handle error);
169       RefCountedPtr<ActiveConnection> const connection_;
170       grpc_pollset* const accepting_pollset_;
171       grpc_tcp_server_acceptor* acceptor_;
172       RefCountedPtr<HandshakeManager> handshake_mgr_
173           ABSL_GUARDED_BY(&connection_->mu_);
174       // State for enforcing handshake timeout on receiving HTTP/2 settings.
175       Timestamp const deadline_;
176       absl::optional<EventEngine::TaskHandle> timer_handle_
177           ABSL_GUARDED_BY(&connection_->mu_);
178       grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
179       grpc_pollset_set* const interested_parties_;
180     };
181 
182     ActiveConnection(grpc_pollset* accepting_pollset,
183                      grpc_tcp_server_acceptor* acceptor,
184                      EventEngine* event_engine, const ChannelArgs& args,
185                      MemoryOwner memory_owner);
186     ~ActiveConnection() override;
187 
188     void Orphan() override;
189 
190     void SendGoAway();
191 
192     void Start(RefCountedPtr<Chttp2ServerListener> listener,
193                grpc_endpoint* endpoint, const ChannelArgs& args);
194 
195     // Needed to be able to grab an external ref in
196     // Chttp2ServerListener::OnAccept()
197     using InternallyRefCounted<ActiveConnection>::Ref;
198 
199    private:
200     static void OnClose(void* arg, grpc_error_handle error);
201     void OnDrainGraceTimeExpiry() ABSL_LOCKS_EXCLUDED(&mu_);
202 
203     RefCountedPtr<Chttp2ServerListener> listener_;
204     Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
205     // Set by HandshakingState before the handshaking begins and reset when
206     // handshaking is done.
207     OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
208     // Set by HandshakingState when handshaking is done and a valid transport
209     // is created.
210     grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
211     grpc_closure on_close_;
212     absl::optional<EventEngine::TaskHandle> drain_grace_timer_handle_
213         ABSL_GUARDED_BY(&mu_);
214     // Use a raw pointer since this event_engine_ is grabbed from the
215     // ChannelArgs of the listener_.
216     EventEngine* const event_engine_ ABSL_GUARDED_BY(&mu_);
217     bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
218   };
219 
220   // To allow access to RefCounted<> like interface.
221   friend class RefCountedPtr<Chttp2ServerListener>;
222 
223   // Should only be called once so as to start the TCP server.
224   void StartListening();
225 
226   static void OnAccept(void* arg, grpc_endpoint* tcp,
227                        grpc_pollset* accepting_pollset,
228                        grpc_tcp_server_acceptor* acceptor);
229 
230   static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
231 
232   static void DestroyListener(Server* /*server*/, void* arg,
233                               grpc_closure* destroy_done);
234 
235   // The interface required by RefCountedPtr<> has been manually implemented
236   // here to take a ref on tcp_server_ instead. Note that, the handshaker
237   // needs tcp_server_ to exist for the lifetime of the handshake since it's
238   // needed by acceptor. Sharing refs between the listener and tcp_server_ is
239   // just an optimization to avoid taking additional refs on the listener,
240   // since TcpServerShutdownComplete already holds a ref to the listener.
IncrementRefCount()241   void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
IncrementRefCount(const DebugLocation &,const char *)242   void IncrementRefCount(const DebugLocation& /* location */,
243                          const char* /* reason */) {
244     IncrementRefCount();
245   }
246 
Ref()247   RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
248     IncrementRefCount();
249     return RefCountedPtr<Chttp2ServerListener>(this);
250   }
Ref(const DebugLocation &,const char *)251   RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
252                                           const char* /* reason */)
253       GRPC_MUST_USE_RESULT {
254     return Ref();
255   }
256 
Unref()257   void Unref() { grpc_tcp_server_unref(tcp_server_); }
Unref(const DebugLocation &,const char *)258   void Unref(const DebugLocation& /* location */, const char* /* reason */) {
259     Unref();
260   }
261 
262   Server* const server_;
263   grpc_tcp_server* tcp_server_;
264   grpc_resolved_address resolved_address_;
265   Chttp2ServerArgsModifier const args_modifier_;
266   ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
267   ChannelArgs args_;
268   Mutex mu_;
269   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
270       connection_manager_ ABSL_GUARDED_BY(mu_);
271   // Signals whether grpc_tcp_server_start() has been called.
272   bool started_ ABSL_GUARDED_BY(mu_) = false;
273   // Signals whether grpc_tcp_server_start() has completed.
274   CondVar started_cv_ ABSL_GUARDED_BY(mu_);
275   // Signals whether new requests/connections are to be accepted.
276   bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
277   // Signals whether the application has triggered shutdown.
278   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
279   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
280       ABSL_GUARDED_BY(mu_);
281   grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
282   grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
283   RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
284   MemoryQuotaRefPtr memory_quota_;
285 };
286 
287 //
288 // Chttp2ServerListener::ConfigFetcherWatcher
289 //
290 
UpdateConnectionManager(RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> connection_manager)291 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
292     RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
293         connection_manager) {
294   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
295       connection_manager_to_destroy;
296   class GracefulShutdownExistingConnections {
297    public:
298     ~GracefulShutdownExistingConnections() {
299       // Send GOAWAYs on the transports so that they get disconnected when
300       // existing RPCs finish, and so that no new RPC is started on them.
301       for (auto& connection : connections_) {
302         connection.first->SendGoAway();
303       }
304     }
305 
306     void set_connections(
307         std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
308             connections) {
309       GPR_ASSERT(connections_.empty());
310       connections_ = std::move(connections);
311     }
312 
313    private:
314     std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_;
315   } connections_to_shutdown;
316   {
317     MutexLock lock(&listener_->mu_);
318     connection_manager_to_destroy = listener_->connection_manager_;
319     listener_->connection_manager_ = std::move(connection_manager);
320     connections_to_shutdown.set_connections(std::move(listener_->connections_));
321     if (listener_->shutdown_) {
322       return;
323     }
324     listener_->is_serving_ = true;
325     if (listener_->started_) return;
326   }
327   int port_temp;
328   grpc_error_handle error = grpc_tcp_server_add_port(
329       listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
330   if (!error.ok()) {
331     gpr_log(GPR_ERROR, "Error adding port to server: %s",
332             StatusToString(error).c_str());
333     // TODO(yashykt): We wouldn't need to assert here if we bound to the
334     // port earlier during AddPort.
335     GPR_ASSERT(0);
336   }
337   listener_->StartListening();
338   {
339     MutexLock lock(&listener_->mu_);
340     listener_->started_ = true;
341     listener_->started_cv_.SignalAll();
342   }
343 }
344 
StopServing()345 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
346   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
347   {
348     MutexLock lock(&listener_->mu_);
349     listener_->is_serving_ = false;
350     connections = std::move(listener_->connections_);
351   }
352   // Send GOAWAYs on the transports so that they disconnected when existing
353   // RPCs finish.
354   for (auto& connection : connections) {
355     connection.first->SendGoAway();
356   }
357 }
358 
359 //
360 // Chttp2ServerListener::ActiveConnection::HandshakingState
361 //
362 
GetConnectionDeadline(const ChannelArgs & args)363 Timestamp GetConnectionDeadline(const ChannelArgs& args) {
364   return Timestamp::Now() +
365          std::max(
366              Duration::Milliseconds(1),
367              args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
368                  .value_or(Duration::Seconds(120)));
369 }
370 
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,const ChannelArgs & args)371 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
372     RefCountedPtr<ActiveConnection> connection_ref,
373     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
374     const ChannelArgs& args)
375     : connection_(std::move(connection_ref)),
376       accepting_pollset_(accepting_pollset),
377       acceptor_(acceptor),
378       handshake_mgr_(MakeRefCounted<HandshakeManager>()),
379       deadline_(GetConnectionDeadline(args)),
380       interested_parties_(grpc_pollset_set_create()) {
381   grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
382   CoreConfiguration::Get().handshaker_registry().AddHandshakers(
383       HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
384 }
385 
~HandshakingState()386 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
387   grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
388   grpc_pollset_set_destroy(interested_parties_);
389   gpr_free(acceptor_);
390 }
391 
Orphan()392 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
393   {
394     MutexLock lock(&connection_->mu_);
395     if (handshake_mgr_ != nullptr) {
396       handshake_mgr_->Shutdown(GRPC_ERROR_CREATE("Listener stopped serving."));
397     }
398   }
399   Unref();
400 }
401 
Start(grpc_endpoint * endpoint,const ChannelArgs & channel_args)402 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
403     grpc_endpoint* endpoint, const ChannelArgs& channel_args) {
404   Ref().release();  // Held by OnHandshakeDone
405   RefCountedPtr<HandshakeManager> handshake_mgr;
406   {
407     MutexLock lock(&connection_->mu_);
408     if (handshake_mgr_ == nullptr) return;
409     handshake_mgr = handshake_mgr_;
410   }
411   handshake_mgr->DoHandshake(endpoint, channel_args, deadline_, acceptor_,
412                              OnHandshakeDone, this);
413 }
414 
OnTimeout()415 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() {
416   grpc_chttp2_transport* transport = nullptr;
417   {
418     MutexLock lock(&connection_->mu_);
419     if (timer_handle_.has_value()) {
420       transport = connection_->transport_;
421       timer_handle_.reset();
422     }
423   }
424   if (transport != nullptr) {
425     grpc_transport_op* op = grpc_make_transport_op(nullptr);
426     op->disconnect_with_error = GRPC_ERROR_CREATE(
427         "Did not receive HTTP/2 settings before handshake timeout");
428     grpc_transport_perform_op(&transport->base, op);
429   }
430 }
431 
432 void Chttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void * arg,grpc_error_handle)433     OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
434   HandshakingState* self = static_cast<HandshakingState*>(arg);
435   {
436     MutexLock lock(&self->connection_->mu_);
437     if (self->timer_handle_.has_value()) {
438       self->connection_->event_engine_->Cancel(*self->timer_handle_);
439       self->timer_handle_.reset();
440     }
441   }
442   self->Unref();
443 }
444 
OnHandshakeDone(void * arg,grpc_error_handle error)445 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
446     void* arg, grpc_error_handle error) {
447   auto* args = static_cast<HandshakerArgs*>(arg);
448   HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
449   OrphanablePtr<HandshakingState> handshaking_state_ref;
450   RefCountedPtr<HandshakeManager> handshake_mgr;
451   bool cleanup_connection = false;
452   {
453     MutexLock connection_lock(&self->connection_->mu_);
454     if (!error.ok() || self->connection_->shutdown_) {
455       std::string error_str = StatusToString(error);
456       gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
457       cleanup_connection = true;
458       if (error.ok() && args->endpoint != nullptr) {
459         // We were shut down or stopped serving after handshaking completed
460         // successfully, so destroy the endpoint here.
461         // TODO(ctiller): It is currently necessary to shutdown endpoints
462         // before destroying them, even if we know that there are no
463         // pending read/write callbacks.  This should be fixed, at which
464         // point this can be removed.
465         grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
466         grpc_endpoint_destroy(args->endpoint);
467         grpc_slice_buffer_destroy(args->read_buffer);
468         gpr_free(args->read_buffer);
469       }
470     } else {
471       // If the handshaking succeeded but there is no endpoint, then the
472       // handshaker may have handed off the connection to some external
473       // code, so we can just clean up here without creating a transport.
474       if (args->endpoint != nullptr) {
475         grpc_transport* transport =
476             grpc_create_chttp2_transport(args->args, args->endpoint, false);
477         grpc_error_handle channel_init_err =
478             self->connection_->listener_->server_->SetupTransport(
479                 transport, self->accepting_pollset_, args->args,
480                 grpc_chttp2_transport_get_socket_node(transport));
481         if (channel_init_err.ok()) {
482           // Use notify_on_receive_settings callback to enforce the
483           // handshake deadline.
484           // Note: The reinterpret_cast<>s here are safe, because
485           // grpc_chttp2_transport is a C-style extension of
486           // grpc_transport, so this is morally equivalent of a
487           // static_cast<> to a derived class.
488           // TODO(roth): Change to static_cast<> when we C++-ify the
489           // transport API.
490           self->connection_->transport_ =
491               reinterpret_cast<grpc_chttp2_transport*>(transport);
492           GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
493                                     "ActiveConnection");  // Held by connection_
494           self->Ref().release();  // Held by OnReceiveSettings().
495           GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
496                             self, grpc_schedule_on_exec_ctx);
497           // If the listener has been configured with a config fetcher, we
498           // need to watch on the transport being closed so that we can an
499           // updated list of active connections.
500           grpc_closure* on_close = nullptr;
501           if (self->connection_->listener_->config_fetcher_watcher_ !=
502               nullptr) {
503             // Refs helds by OnClose()
504             self->connection_->Ref().release();
505             on_close = &self->connection_->on_close_;
506           } else {
507             // Remove the connection from the connections_ map since OnClose()
508             // will not be invoked when a config fetcher is set.
509             cleanup_connection = true;
510           }
511           grpc_chttp2_transport_start_reading(transport, args->read_buffer,
512                                               &self->on_receive_settings_,
513                                               on_close);
514           self->timer_handle_ = self->connection_->event_engine_->RunAfter(
515               self->deadline_ - Timestamp::Now(),
516               [self = self->Ref()]() mutable {
517                 ApplicationCallbackExecCtx callback_exec_ctx;
518                 ExecCtx exec_ctx;
519                 self->OnTimeout();
520                 // HandshakingState deletion might require an active ExecCtx.
521                 self.reset();
522               });
523         } else {
524           // Failed to create channel from transport. Clean up.
525           gpr_log(GPR_ERROR, "Failed to create channel: %s",
526                   StatusToString(channel_init_err).c_str());
527           grpc_transport_destroy(transport);
528           grpc_slice_buffer_destroy(args->read_buffer);
529           gpr_free(args->read_buffer);
530           cleanup_connection = true;
531         }
532       } else {
533         cleanup_connection = true;
534       }
535     }
536     // Since the handshake manager is done, the connection no longer needs to
537     // shutdown the handshake when the listener needs to stop serving.
538     // Avoid calling the destructor of HandshakeManager and HandshakingState
539     // from within the critical region.
540     handshake_mgr = std::move(self->handshake_mgr_);
541     handshaking_state_ref = std::move(self->connection_->handshaking_state_);
542   }
543   gpr_free(self->acceptor_);
544   self->acceptor_ = nullptr;
545   OrphanablePtr<ActiveConnection> connection;
546   if (cleanup_connection) {
547     MutexLock listener_lock(&self->connection_->listener_->mu_);
548     auto it = self->connection_->listener_->connections_.find(
549         self->connection_.get());
550     if (it != self->connection_->listener_->connections_.end()) {
551       connection = std::move(it->second);
552       self->connection_->listener_->connections_.erase(it);
553     }
554   }
555   self->Unref();
556 }
557 
558 //
559 // Chttp2ServerListener::ActiveConnection
560 //
561 
ActiveConnection(grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,EventEngine * event_engine,const ChannelArgs & args,MemoryOwner memory_owner)562 Chttp2ServerListener::ActiveConnection::ActiveConnection(
563     grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
564     EventEngine* event_engine, const ChannelArgs& args,
565     MemoryOwner memory_owner)
566     : handshaking_state_(memory_owner.MakeOrphanable<HandshakingState>(
567           Ref(), accepting_pollset, acceptor, args)),
568       event_engine_(event_engine) {
569   GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
570                     grpc_schedule_on_exec_ctx);
571 }
572 
~ActiveConnection()573 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
574   if (transport_ != nullptr) {
575     GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
576   }
577 }
578 
Orphan()579 void Chttp2ServerListener::ActiveConnection::Orphan() {
580   OrphanablePtr<HandshakingState> handshaking_state;
581   {
582     MutexLock lock(&mu_);
583     shutdown_ = true;
584     // Reset handshaking_state_ since we have been orphaned by the listener
585     // signaling that the listener has stopped serving.
586     handshaking_state = std::move(handshaking_state_);
587   }
588   Unref();
589 }
590 
SendGoAway()591 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
592   grpc_chttp2_transport* transport = nullptr;
593   {
594     MutexLock lock(&mu_);
595     if (transport_ != nullptr && !shutdown_) {
596       transport = transport_;
597       drain_grace_timer_handle_ = event_engine_->RunAfter(
598           std::max(Duration::Zero(),
599                    listener_->args_
600                        .GetDurationFromIntMillis(
601                            GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
602                        .value_or(Duration::Minutes(10))),
603           [self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
604             ApplicationCallbackExecCtx callback_exec_ctx;
605             ExecCtx exec_ctx;
606             self->OnDrainGraceTimeExpiry();
607             self.reset(DEBUG_LOCATION, "drain_grace_timer");
608           });
609       shutdown_ = true;
610     }
611   }
612   if (transport != nullptr) {
613     grpc_transport_op* op = grpc_make_transport_op(nullptr);
614     op->goaway_error =
615         GRPC_ERROR_CREATE("Server is stopping to serve requests.");
616     grpc_transport_perform_op(&transport->base, op);
617   }
618 }
619 
Start(RefCountedPtr<Chttp2ServerListener> listener,grpc_endpoint * endpoint,const ChannelArgs & args)620 void Chttp2ServerListener::ActiveConnection::Start(
621     RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
622     const ChannelArgs& args) {
623   RefCountedPtr<HandshakingState> handshaking_state_ref;
624   listener_ = std::move(listener);
625   {
626     MutexLock lock(&mu_);
627     if (shutdown_) return;
628     // Hold a ref to HandshakingState to allow starting the handshake outside
629     // the critical region.
630     handshaking_state_ref = handshaking_state_->Ref();
631   }
632   handshaking_state_ref->Start(endpoint, args);
633 }
634 
OnClose(void * arg,grpc_error_handle)635 void Chttp2ServerListener::ActiveConnection::OnClose(
636     void* arg, grpc_error_handle /* error */) {
637   ActiveConnection* self = static_cast<ActiveConnection*>(arg);
638   OrphanablePtr<ActiveConnection> connection;
639   {
640     MutexLock listener_lock(&self->listener_->mu_);
641     MutexLock connection_lock(&self->mu_);
642     // The node was already deleted from the connections_ list if the
643     // connection is shutdown.
644     if (!self->shutdown_) {
645       auto it = self->listener_->connections_.find(self);
646       if (it != self->listener_->connections_.end()) {
647         connection = std::move(it->second);
648         self->listener_->connections_.erase(it);
649       }
650       self->shutdown_ = true;
651     }
652     // Cancel the drain_grace_timer_ if needed.
653     if (self->drain_grace_timer_handle_.has_value()) {
654       self->event_engine_->Cancel(*self->drain_grace_timer_handle_);
655       self->drain_grace_timer_handle_.reset();
656     }
657   }
658   self->Unref();
659 }
660 
OnDrainGraceTimeExpiry()661 void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
662   grpc_chttp2_transport* transport = nullptr;
663   // If the drain_grace_timer_ was not cancelled, disconnect the transport
664   // immediately.
665   {
666     MutexLock lock(&mu_);
667     if (drain_grace_timer_handle_.has_value()) {
668       transport = transport_;
669       drain_grace_timer_handle_.reset();
670     }
671   }
672   if (transport != nullptr) {
673     grpc_transport_op* op = grpc_make_transport_op(nullptr);
674     op->disconnect_with_error = GRPC_ERROR_CREATE(
675         "Drain grace time expired. Closing connection immediately.");
676     grpc_transport_perform_op(&transport->base, op);
677   }
678 }
679 
680 //
681 // Chttp2ServerListener
682 //
683 
Create(Server * server,grpc_resolved_address * addr,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier,int * port_num)684 grpc_error_handle Chttp2ServerListener::Create(
685     Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
686     Chttp2ServerArgsModifier args_modifier, int* port_num) {
687   Chttp2ServerListener* listener = nullptr;
688   // The bulk of this method is inside of a lambda to make cleanup
689   // easier without using goto.
690   grpc_error_handle error = [&]() {
691     grpc_error_handle error;
692     // Create Chttp2ServerListener.
693     listener = new Chttp2ServerListener(server, args, args_modifier);
694     error = grpc_tcp_server_create(
695         &listener->tcp_server_shutdown_complete_,
696         grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
697         OnAccept, listener, &listener->tcp_server_);
698     if (!error.ok()) return error;
699     if (server->config_fetcher() != nullptr) {
700       listener->resolved_address_ = *addr;
701       // TODO(yashykt): Consider binding so as to be able to return the port
702       // number.
703     } else {
704       error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
705       if (!error.ok()) return error;
706     }
707     // Create channelz node.
708     if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
709             .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
710       auto string_address = grpc_sockaddr_to_uri(addr);
711       if (!string_address.ok()) {
712         return GRPC_ERROR_CREATE(string_address.status().ToString());
713       }
714       listener->channelz_listen_socket_ =
715           MakeRefCounted<channelz::ListenSocketNode>(
716               *string_address,
717               absl::StrCat("chttp2 listener ", *string_address));
718     }
719     // Register with the server only upon success
720     server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
721     return absl::OkStatus();
722   }();
723   if (!error.ok()) {
724     if (listener != nullptr) {
725       if (listener->tcp_server_ != nullptr) {
726         // listener is deleted when tcp_server_ is shutdown.
727         grpc_tcp_server_unref(listener->tcp_server_);
728       } else {
729         delete listener;
730       }
731     }
732   }
733   return error;
734 }
735 
CreateWithAcceptor(Server * server,const char * name,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier)736 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
737     Server* server, const char* name, const ChannelArgs& args,
738     Chttp2ServerArgsModifier args_modifier) {
739   Chttp2ServerListener* listener =
740       new Chttp2ServerListener(server, args, args_modifier);
741   grpc_error_handle error = grpc_tcp_server_create(
742       &listener->tcp_server_shutdown_complete_,
743       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
744       OnAccept, listener, &listener->tcp_server_);
745   if (!error.ok()) {
746     delete listener;
747     return error;
748   }
749   // TODO(yangg) channelz
750   TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name);
751   *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
752   server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
753   return absl::OkStatus();
754 }
755 
Chttp2ServerListener(Server * server,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier)756 Chttp2ServerListener::Chttp2ServerListener(
757     Server* server, const ChannelArgs& args,
758     Chttp2ServerArgsModifier args_modifier)
759     : server_(server),
760       args_modifier_(args_modifier),
761       args_(args),
762       memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()) {
763   GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
764                     this, grpc_schedule_on_exec_ctx);
765 }
766 
~Chttp2ServerListener()767 Chttp2ServerListener::~Chttp2ServerListener() {
768   // Flush queued work before destroying handshaker factory, since that
769   // may do a synchronous unref.
770   ExecCtx::Get()->Flush();
771   if (on_destroy_done_ != nullptr) {
772     ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus());
773     ExecCtx::Get()->Flush();
774   }
775 }
776 
777 // Server callback: start listening on our ports
Start(Server *,const std::vector<grpc_pollset * > *)778 void Chttp2ServerListener::Start(
779     Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
780   if (server_->config_fetcher() != nullptr) {
781     auto watcher = std::make_unique<ConfigFetcherWatcher>(Ref());
782     config_fetcher_watcher_ = watcher.get();
783     server_->config_fetcher()->StartWatch(
784         grpc_sockaddr_to_string(&resolved_address_, false).value(),
785         std::move(watcher));
786   } else {
787     {
788       MutexLock lock(&mu_);
789       started_ = true;
790       is_serving_ = true;
791     }
792     StartListening();
793   }
794 }
795 
StartListening()796 void Chttp2ServerListener::StartListening() {
797   grpc_tcp_server_start(tcp_server_, &server_->pollsets());
798 }
799 
SetOnDestroyDone(grpc_closure * on_destroy_done)800 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
801   MutexLock lock(&mu_);
802   on_destroy_done_ = on_destroy_done;
803 }
804 
OnAccept(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)805 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
806                                     grpc_pollset* accepting_pollset,
807                                     grpc_tcp_server_acceptor* acceptor) {
808   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
809   ChannelArgs args = self->args_;
810   RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
811       connection_manager;
812   {
813     MutexLock lock(&self->mu_);
814     connection_manager = self->connection_manager_;
815   }
816   auto endpoint_cleanup = [&](grpc_error_handle error) {
817     grpc_endpoint_shutdown(tcp, error);
818     grpc_endpoint_destroy(tcp);
819     gpr_free(acceptor);
820   };
821   if (self->server_->config_fetcher() != nullptr) {
822     if (connection_manager == nullptr) {
823       grpc_error_handle error = GRPC_ERROR_CREATE(
824           "No ConnectionManager configured. Closing connection.");
825       endpoint_cleanup(error);
826       return;
827     }
828     absl::StatusOr<ChannelArgs> args_result =
829         connection_manager->UpdateChannelArgsForConnection(args, tcp);
830     if (!args_result.ok()) {
831       gpr_log(GPR_DEBUG, "Closing connection: %s",
832               args_result.status().ToString().c_str());
833       endpoint_cleanup(GRPC_ERROR_CREATE(args_result.status().ToString()));
834       return;
835     }
836     grpc_error_handle error;
837     args = self->args_modifier_(*args_result, &error);
838     if (!error.ok()) {
839       gpr_log(GPR_DEBUG, "Closing connection: %s",
840               StatusToString(error).c_str());
841       endpoint_cleanup(error);
842       return;
843     }
844   }
845   auto memory_owner = self->memory_quota_->CreateMemoryOwner(
846       absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel"));
847   EventEngine* const event_engine = self->args_.GetObject<EventEngine>();
848   auto connection = memory_owner.MakeOrphanable<ActiveConnection>(
849       accepting_pollset, acceptor, event_engine, args, std::move(memory_owner));
850   // We no longer own acceptor
851   acceptor = nullptr;
852   // Hold a ref to connection to allow starting handshake outside the
853   // critical region
854   RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
855   RefCountedPtr<Chttp2ServerListener> listener_ref;
856   {
857     MutexLock lock(&self->mu_);
858     // Shutdown the the connection if listener's stopped serving or if the
859     // connection manager has changed.
860     if (!self->shutdown_ && self->is_serving_ &&
861         connection_manager == self->connection_manager_) {
862       // This ref needs to be taken in the critical region after having made
863       // sure that the listener has not been Orphaned, so as to avoid
864       // heap-use-after-free issues where `Ref()` is invoked when the ref of
865       // tcp_server_ has already reached 0. (Ref() implementation of
866       // Chttp2ServerListener is grpc_tcp_server_ref().)
867       listener_ref = self->Ref();
868       self->connections_.emplace(connection.get(), std::move(connection));
869     }
870   }
871   if (connection != nullptr) {
872     endpoint_cleanup(absl::OkStatus());
873   } else {
874     connection_ref->Start(std::move(listener_ref), tcp, args);
875   }
876 }
877 
TcpServerShutdownComplete(void * arg,grpc_error_handle)878 void Chttp2ServerListener::TcpServerShutdownComplete(
879     void* arg, grpc_error_handle /*error*/) {
880   Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
881   self->channelz_listen_socket_.reset();
882   delete self;
883 }
884 
885 // Server callback: destroy the tcp listener (so we don't generate further
886 // callbacks)
Orphan()887 void Chttp2ServerListener::Orphan() {
888   // Cancel the watch before shutting down so as to avoid holding a ref to the
889   // listener in the watcher.
890   if (config_fetcher_watcher_ != nullptr) {
891     server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
892   }
893   std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
894   grpc_tcp_server* tcp_server;
895   {
896     MutexLock lock(&mu_);
897     shutdown_ = true;
898     is_serving_ = false;
899     // Orphan the connections so that they can start cleaning up.
900     connections = std::move(connections_);
901     // If the listener is currently set to be serving but has not been started
902     // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
903     // operation to finish to avoid causing races.
904     while (is_serving_ && !started_) {
905       started_cv_.Wait(&mu_);
906     }
907     tcp_server = tcp_server_;
908   }
909   grpc_tcp_server_shutdown_listeners(tcp_server);
910   grpc_tcp_server_unref(tcp_server);
911 }
912 
913 }  // namespace
914 
915 //
916 // Chttp2ServerAddPort()
917 //
918 
Chttp2ServerAddPort(Server * server,const char * addr,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier,int * port_num)919 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
920                                       const ChannelArgs& args,
921                                       Chttp2ServerArgsModifier args_modifier,
922                                       int* port_num) {
923   if (addr == nullptr) {
924     return GRPC_ERROR_CREATE("Invalid address: addr cannot be a nullptr.");
925   }
926   if (strncmp(addr, "external:", 9) == 0) {
927     return Chttp2ServerListener::CreateWithAcceptor(server, addr, args,
928                                                     args_modifier);
929   }
930   *port_num = -1;
931   absl::StatusOr<std::vector<grpc_resolved_address>> resolved_or;
932   std::vector<grpc_error_handle> error_list;
933   std::string parsed_addr = URI::PercentDecode(addr);
934   absl::string_view parsed_addr_unprefixed{parsed_addr};
935   // Using lambda to avoid use of goto.
936   grpc_error_handle error = [&]() {
937     grpc_error_handle error;
938     if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix)) {
939       resolved_or = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
940     } else if (absl::ConsumePrefix(&parsed_addr_unprefixed,
941                                    kUnixAbstractUriPrefix)) {
942       resolved_or =
943           grpc_resolve_unix_abstract_domain_address(parsed_addr_unprefixed);
944     } else {
945       resolved_or =
946           GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
947     }
948     if (!resolved_or.ok()) {
949       return absl_status_to_grpc_error(resolved_or.status());
950     }
951     // Create a listener for each resolved address.
952     for (auto& addr : *resolved_or) {
953       // If address has a wildcard port (0), use the same port as a previous
954       // listener.
955       if (*port_num != -1 && grpc_sockaddr_get_port(&addr) == 0) {
956         grpc_sockaddr_set_port(&addr, *port_num);
957       }
958       int port_temp = -1;
959       error = Chttp2ServerListener::Create(server, &addr, args, args_modifier,
960                                            &port_temp);
961       if (!error.ok()) {
962         error_list.push_back(error);
963       } else {
964         if (*port_num == -1) {
965           *port_num = port_temp;
966         } else {
967           GPR_ASSERT(*port_num == port_temp);
968         }
969       }
970     }
971     if (error_list.size() == resolved_or->size()) {
972       std::string msg = absl::StrFormat(
973           "No address added out of total %" PRIuPTR " resolved for '%s'",
974           resolved_or->size(), addr);
975       return GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
976                                            error_list.size());
977     } else if (!error_list.empty()) {
978       std::string msg = absl::StrFormat(
979           "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
980           " resolved",
981           resolved_or->size() - error_list.size(), resolved_or->size());
982       error = GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
983                                             error_list.size());
984       gpr_log(GPR_INFO, "WARNING: %s", StatusToString(error).c_str());
985       // we managed to bind some addresses: continue without error
986     }
987     return absl::OkStatus();
988   }();  // lambda end
989   if (!error.ok()) *port_num = 0;
990   return error;
991 }
992 
993 namespace {
994 
ModifyArgsForConnection(const ChannelArgs & args,grpc_error_handle * error)995 ChannelArgs ModifyArgsForConnection(const ChannelArgs& args,
996                                     grpc_error_handle* error) {
997   auto* server_credentials = args.GetObject<grpc_server_credentials>();
998   if (server_credentials == nullptr) {
999     *error = GRPC_ERROR_CREATE("Could not find server credentials");
1000     return args;
1001   }
1002   auto security_connector = server_credentials->create_security_connector(args);
1003   if (security_connector == nullptr) {
1004     *error = GRPC_ERROR_CREATE(
1005         absl::StrCat("Unable to create secure server with credentials of type ",
1006                      server_credentials->type().name()));
1007     return args;
1008   }
1009   return args.SetObject(security_connector);
1010 }
1011 
1012 }  // namespace
1013 }  // namespace grpc_core
1014 
grpc_server_add_http2_port(grpc_server * server,const char * addr,grpc_server_credentials * creds)1015 int grpc_server_add_http2_port(grpc_server* server, const char* addr,
1016                                grpc_server_credentials* creds) {
1017   grpc_core::ExecCtx exec_ctx;
1018   grpc_error_handle err;
1019   grpc_core::RefCountedPtr<grpc_server_security_connector> sc;
1020   int port_num = 0;
1021   grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1022   grpc_core::ChannelArgs args = core_server->channel_args();
1023   GRPC_API_TRACE("grpc_server_add_http2_port(server=%p, addr=%s, creds=%p)", 3,
1024                  (server, addr, creds));
1025   // Create security context.
1026   if (creds == nullptr) {
1027     err = GRPC_ERROR_CREATE(
1028         "No credentials specified for secure server port (creds==NULL)");
1029     goto done;
1030   }
1031   // TODO(yashykt): Ideally, we would not want to have different behavior here
1032   // based on whether a config fetcher is configured or not. Currently, we have
1033   // a feature for SSL credentials reloading with an application callback that
1034   // assumes that there is a single security connector. If we delay the creation
1035   // of the security connector to after the creation of the listener(s), we
1036   // would have potentially multiple security connectors which breaks the
1037   // assumption for SSL creds reloading. When the API for SSL creds reloading is
1038   // rewritten, we would be able to make this workaround go away by removing
1039   // that assumption. As an immediate drawback of this workaround, config
1040   // fetchers need to be registered before adding ports to the server.
1041   if (core_server->config_fetcher() != nullptr) {
1042     // Create channel args.
1043     args = args.SetObject(creds->Ref());
1044   } else {
1045     sc = creds->create_security_connector(grpc_core::ChannelArgs());
1046     if (sc == nullptr) {
1047       err = GRPC_ERROR_CREATE(absl::StrCat(
1048           "Unable to create secure server with credentials of type ",
1049           creds->type().name()));
1050       goto done;
1051     }
1052     args = args.SetObject(creds->Ref()).SetObject(sc);
1053   }
1054   // Add server port.
1055   err = grpc_core::Chttp2ServerAddPort(
1056       core_server, addr, args, grpc_core::ModifyArgsForConnection, &port_num);
1057 done:
1058   sc.reset(DEBUG_LOCATION, "server");
1059   if (!err.ok()) {
1060     gpr_log(GPR_ERROR, "%s", grpc_core::StatusToString(err).c_str());
1061   }
1062   return port_num;
1063 }
1064 
1065 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
grpc_server_add_channel_from_fd(grpc_server * server,int fd,grpc_server_credentials * creds)1066 void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
1067                                      grpc_server_credentials* creds) {
1068   // For now, we only support insecure server credentials
1069   if (creds == nullptr ||
1070       creds->type() != grpc_core::InsecureServerCredentials::Type()) {
1071     gpr_log(GPR_ERROR, "Failed to create channel due to invalid creds");
1072     return;
1073   }
1074   grpc_core::ExecCtx exec_ctx;
1075   grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1076 
1077   grpc_core::ChannelArgs server_args = core_server->channel_args();
1078   std::string name = absl::StrCat("fd:", fd);
1079   auto memory_quota =
1080       server_args.GetObject<grpc_core::ResourceQuota>()->memory_quota();
1081   grpc_endpoint* server_endpoint = grpc_tcp_create_from_fd(
1082       grpc_fd_create(fd, name.c_str(), true),
1083       grpc_event_engine::experimental::ChannelArgsEndpointConfig(server_args),
1084       name);
1085   grpc_transport* transport = grpc_create_chttp2_transport(
1086       server_args, server_endpoint, false  // is_client
1087   );
1088   grpc_error_handle error =
1089       core_server->SetupTransport(transport, nullptr, server_args, nullptr);
1090   if (error.ok()) {
1091     for (grpc_pollset* pollset : core_server->pollsets()) {
1092       grpc_endpoint_add_to_pollset(server_endpoint, pollset);
1093     }
1094     grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
1095   } else {
1096     gpr_log(GPR_ERROR, "Failed to create channel: %s",
1097             grpc_core::StatusToString(error).c_str());
1098     grpc_transport_destroy(transport);
1099   }
1100 }
1101 
1102 #else  // !GPR_SUPPORT_CHANNELS_FROM_FD
1103 
grpc_server_add_channel_from_fd(grpc_server *,int,grpc_server_credentials *)1104 void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */,
1105                                      grpc_server_credentials* /* creds */) {
1106   GPR_ASSERT(0);
1107 }
1108 
1109 #endif  // GPR_SUPPORT_CHANNELS_FROM_FD
1110