1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/server/chttp2_server.h"
22
23 #include <inttypes.h>
24 #include <string.h>
25
26 #include <algorithm>
27 #include <initializer_list>
28 #include <map>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/base/thread_annotations.h"
35 #include "absl/status/status.h"
36 #include "absl/status/statusor.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_format.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/strings/strip.h"
41 #include "absl/types/optional.h"
42
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/grpc.h>
45 #include <grpc/grpc_posix.h>
46 #include <grpc/slice_buffer.h>
47 #include <grpc/support/alloc.h>
48 #include <grpc/support/log.h>
49
50 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
51 #include "src/core/ext/transport/chttp2/transport/frame.h"
52 #include "src/core/ext/transport/chttp2/transport/internal.h"
53 #include "src/core/lib/address_utils/sockaddr_utils.h"
54 #include "src/core/lib/channel/channel_args.h"
55 #include "src/core/lib/channel/channelz.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/debug/trace.h"
58 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
59 #include "src/core/lib/gprpp/debug_location.h"
60 #include "src/core/lib/gprpp/orphanable.h"
61 #include "src/core/lib/gprpp/ref_counted_ptr.h"
62 #include "src/core/lib/gprpp/status_helper.h"
63 #include "src/core/lib/gprpp/sync.h"
64 #include "src/core/lib/gprpp/time.h"
65 #include "src/core/lib/gprpp/unique_type_name.h"
66 #include "src/core/lib/iomgr/closure.h"
67 #include "src/core/lib/iomgr/endpoint.h"
68 #include "src/core/lib/iomgr/iomgr_fwd.h"
69 #include "src/core/lib/iomgr/pollset_set.h"
70 #include "src/core/lib/iomgr/resolve_address.h"
71 #include "src/core/lib/iomgr/resolved_address.h"
72 #include "src/core/lib/iomgr/tcp_server.h"
73 #include "src/core/lib/iomgr/unix_sockets_posix.h"
74 #include "src/core/lib/resource_quota/memory_quota.h"
75 #include "src/core/lib/resource_quota/resource_quota.h"
76 #include "src/core/lib/security/credentials/credentials.h"
77 #include "src/core/lib/security/credentials/insecure/insecure_credentials.h"
78 #include "src/core/lib/security/security_connector/security_connector.h"
79 #include "src/core/lib/surface/api_trace.h"
80 #include "src/core/lib/surface/server.h"
81 #include "src/core/lib/transport/error_utils.h"
82 #include "src/core/lib/transport/handshaker.h"
83 #include "src/core/lib/transport/handshaker_registry.h"
84 #include "src/core/lib/transport/transport.h"
85 #include "src/core/lib/transport/transport_fwd.h"
86 #include "src/core/lib/uri/uri_parser.h"
87
88 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
89 #include "src/core/lib/iomgr/ev_posix.h"
90 #include "src/core/lib/iomgr/exec_ctx.h"
91 #include "src/core/lib/iomgr/tcp_client_posix.h"
92 #endif // GPR_SUPPORT_CHANNELS_FROM_FD
93
94 namespace grpc_core {
95 namespace {
96
97 using ::grpc_event_engine::experimental::EventEngine;
98
99 const char kUnixUriPrefix[] = "unix:";
100 const char kUnixAbstractUriPrefix[] = "unix-abstract:";
101
102 class Chttp2ServerListener : public Server::ListenerInterface {
103 public:
104 static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
105 const ChannelArgs& args,
106 Chttp2ServerArgsModifier args_modifier,
107 int* port_num);
108
109 static grpc_error_handle CreateWithAcceptor(
110 Server* server, const char* name, const ChannelArgs& args,
111 Chttp2ServerArgsModifier args_modifier);
112
113 // Do not instantiate directly. Use one of the factory methods above.
114 Chttp2ServerListener(Server* server, const ChannelArgs& args,
115 Chttp2ServerArgsModifier args_modifier);
116 ~Chttp2ServerListener() override;
117
118 void Start(Server* server,
119 const std::vector<grpc_pollset*>* pollsets) override;
120
channelz_listen_socket_node() const121 channelz::ListenSocketNode* channelz_listen_socket_node() const override {
122 return channelz_listen_socket_.get();
123 }
124
125 void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
126
127 void Orphan() override;
128
129 private:
130 class ConfigFetcherWatcher
131 : public grpc_server_config_fetcher::WatcherInterface {
132 public:
ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)133 explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
134 : listener_(std::move(listener)) {}
135
136 void UpdateConnectionManager(
137 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
138 connection_manager) override;
139
140 void StopServing() override;
141
142 private:
143 RefCountedPtr<Chttp2ServerListener> listener_;
144 };
145
146 class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
147 public:
148 class HandshakingState : public InternallyRefCounted<HandshakingState> {
149 public:
150 HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
151 grpc_pollset* accepting_pollset,
152 grpc_tcp_server_acceptor* acceptor,
153 const ChannelArgs& args);
154
155 ~HandshakingState() override;
156
157 void Orphan() override;
158
159 void Start(grpc_endpoint* endpoint, const ChannelArgs& args);
160
161 // Needed to be able to grab an external ref in
162 // ActiveConnection::Start()
163 using InternallyRefCounted<HandshakingState>::Ref;
164
165 private:
166 void OnTimeout() ABSL_LOCKS_EXCLUDED(&connection_->mu_);
167 static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
168 static void OnHandshakeDone(void* arg, grpc_error_handle error);
169 RefCountedPtr<ActiveConnection> const connection_;
170 grpc_pollset* const accepting_pollset_;
171 grpc_tcp_server_acceptor* acceptor_;
172 RefCountedPtr<HandshakeManager> handshake_mgr_
173 ABSL_GUARDED_BY(&connection_->mu_);
174 // State for enforcing handshake timeout on receiving HTTP/2 settings.
175 Timestamp const deadline_;
176 absl::optional<EventEngine::TaskHandle> timer_handle_
177 ABSL_GUARDED_BY(&connection_->mu_);
178 grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
179 grpc_pollset_set* const interested_parties_;
180 };
181
182 ActiveConnection(grpc_pollset* accepting_pollset,
183 grpc_tcp_server_acceptor* acceptor,
184 EventEngine* event_engine, const ChannelArgs& args,
185 MemoryOwner memory_owner);
186 ~ActiveConnection() override;
187
188 void Orphan() override;
189
190 void SendGoAway();
191
192 void Start(RefCountedPtr<Chttp2ServerListener> listener,
193 grpc_endpoint* endpoint, const ChannelArgs& args);
194
195 // Needed to be able to grab an external ref in
196 // Chttp2ServerListener::OnAccept()
197 using InternallyRefCounted<ActiveConnection>::Ref;
198
199 private:
200 static void OnClose(void* arg, grpc_error_handle error);
201 void OnDrainGraceTimeExpiry() ABSL_LOCKS_EXCLUDED(&mu_);
202
203 RefCountedPtr<Chttp2ServerListener> listener_;
204 Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
205 // Set by HandshakingState before the handshaking begins and reset when
206 // handshaking is done.
207 OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
208 // Set by HandshakingState when handshaking is done and a valid transport
209 // is created.
210 grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
211 grpc_closure on_close_;
212 absl::optional<EventEngine::TaskHandle> drain_grace_timer_handle_
213 ABSL_GUARDED_BY(&mu_);
214 // Use a raw pointer since this event_engine_ is grabbed from the
215 // ChannelArgs of the listener_.
216 EventEngine* const event_engine_ ABSL_GUARDED_BY(&mu_);
217 bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
218 };
219
220 // To allow access to RefCounted<> like interface.
221 friend class RefCountedPtr<Chttp2ServerListener>;
222
223 // Should only be called once so as to start the TCP server.
224 void StartListening();
225
226 static void OnAccept(void* arg, grpc_endpoint* tcp,
227 grpc_pollset* accepting_pollset,
228 grpc_tcp_server_acceptor* acceptor);
229
230 static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
231
232 static void DestroyListener(Server* /*server*/, void* arg,
233 grpc_closure* destroy_done);
234
235 // The interface required by RefCountedPtr<> has been manually implemented
236 // here to take a ref on tcp_server_ instead. Note that, the handshaker
237 // needs tcp_server_ to exist for the lifetime of the handshake since it's
238 // needed by acceptor. Sharing refs between the listener and tcp_server_ is
239 // just an optimization to avoid taking additional refs on the listener,
240 // since TcpServerShutdownComplete already holds a ref to the listener.
IncrementRefCount()241 void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
IncrementRefCount(const DebugLocation &,const char *)242 void IncrementRefCount(const DebugLocation& /* location */,
243 const char* /* reason */) {
244 IncrementRefCount();
245 }
246
Ref()247 RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
248 IncrementRefCount();
249 return RefCountedPtr<Chttp2ServerListener>(this);
250 }
Ref(const DebugLocation &,const char *)251 RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
252 const char* /* reason */)
253 GRPC_MUST_USE_RESULT {
254 return Ref();
255 }
256
Unref()257 void Unref() { grpc_tcp_server_unref(tcp_server_); }
Unref(const DebugLocation &,const char *)258 void Unref(const DebugLocation& /* location */, const char* /* reason */) {
259 Unref();
260 }
261
262 Server* const server_;
263 grpc_tcp_server* tcp_server_;
264 grpc_resolved_address resolved_address_;
265 Chttp2ServerArgsModifier const args_modifier_;
266 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
267 ChannelArgs args_;
268 Mutex mu_;
269 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
270 connection_manager_ ABSL_GUARDED_BY(mu_);
271 // Signals whether grpc_tcp_server_start() has been called.
272 bool started_ ABSL_GUARDED_BY(mu_) = false;
273 // Signals whether grpc_tcp_server_start() has completed.
274 CondVar started_cv_ ABSL_GUARDED_BY(mu_);
275 // Signals whether new requests/connections are to be accepted.
276 bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
277 // Signals whether the application has triggered shutdown.
278 bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
279 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
280 ABSL_GUARDED_BY(mu_);
281 grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
282 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
283 RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
284 MemoryQuotaRefPtr memory_quota_;
285 };
286
287 //
288 // Chttp2ServerListener::ConfigFetcherWatcher
289 //
290
UpdateConnectionManager(RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> connection_manager)291 void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
292 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
293 connection_manager) {
294 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
295 connection_manager_to_destroy;
296 class GracefulShutdownExistingConnections {
297 public:
298 ~GracefulShutdownExistingConnections() {
299 // Send GOAWAYs on the transports so that they get disconnected when
300 // existing RPCs finish, and so that no new RPC is started on them.
301 for (auto& connection : connections_) {
302 connection.first->SendGoAway();
303 }
304 }
305
306 void set_connections(
307 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
308 connections) {
309 GPR_ASSERT(connections_.empty());
310 connections_ = std::move(connections);
311 }
312
313 private:
314 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_;
315 } connections_to_shutdown;
316 {
317 MutexLock lock(&listener_->mu_);
318 connection_manager_to_destroy = listener_->connection_manager_;
319 listener_->connection_manager_ = std::move(connection_manager);
320 connections_to_shutdown.set_connections(std::move(listener_->connections_));
321 if (listener_->shutdown_) {
322 return;
323 }
324 listener_->is_serving_ = true;
325 if (listener_->started_) return;
326 }
327 int port_temp;
328 grpc_error_handle error = grpc_tcp_server_add_port(
329 listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
330 if (!error.ok()) {
331 gpr_log(GPR_ERROR, "Error adding port to server: %s",
332 StatusToString(error).c_str());
333 // TODO(yashykt): We wouldn't need to assert here if we bound to the
334 // port earlier during AddPort.
335 GPR_ASSERT(0);
336 }
337 listener_->StartListening();
338 {
339 MutexLock lock(&listener_->mu_);
340 listener_->started_ = true;
341 listener_->started_cv_.SignalAll();
342 }
343 }
344
StopServing()345 void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
346 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
347 {
348 MutexLock lock(&listener_->mu_);
349 listener_->is_serving_ = false;
350 connections = std::move(listener_->connections_);
351 }
352 // Send GOAWAYs on the transports so that they disconnected when existing
353 // RPCs finish.
354 for (auto& connection : connections) {
355 connection.first->SendGoAway();
356 }
357 }
358
359 //
360 // Chttp2ServerListener::ActiveConnection::HandshakingState
361 //
362
GetConnectionDeadline(const ChannelArgs & args)363 Timestamp GetConnectionDeadline(const ChannelArgs& args) {
364 return Timestamp::Now() +
365 std::max(
366 Duration::Milliseconds(1),
367 args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
368 .value_or(Duration::Seconds(120)));
369 }
370
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,const ChannelArgs & args)371 Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
372 RefCountedPtr<ActiveConnection> connection_ref,
373 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
374 const ChannelArgs& args)
375 : connection_(std::move(connection_ref)),
376 accepting_pollset_(accepting_pollset),
377 acceptor_(acceptor),
378 handshake_mgr_(MakeRefCounted<HandshakeManager>()),
379 deadline_(GetConnectionDeadline(args)),
380 interested_parties_(grpc_pollset_set_create()) {
381 grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
382 CoreConfiguration::Get().handshaker_registry().AddHandshakers(
383 HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
384 }
385
~HandshakingState()386 Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
387 grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
388 grpc_pollset_set_destroy(interested_parties_);
389 gpr_free(acceptor_);
390 }
391
Orphan()392 void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
393 {
394 MutexLock lock(&connection_->mu_);
395 if (handshake_mgr_ != nullptr) {
396 handshake_mgr_->Shutdown(GRPC_ERROR_CREATE("Listener stopped serving."));
397 }
398 }
399 Unref();
400 }
401
Start(grpc_endpoint * endpoint,const ChannelArgs & channel_args)402 void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
403 grpc_endpoint* endpoint, const ChannelArgs& channel_args) {
404 Ref().release(); // Held by OnHandshakeDone
405 RefCountedPtr<HandshakeManager> handshake_mgr;
406 {
407 MutexLock lock(&connection_->mu_);
408 if (handshake_mgr_ == nullptr) return;
409 handshake_mgr = handshake_mgr_;
410 }
411 handshake_mgr->DoHandshake(endpoint, channel_args, deadline_, acceptor_,
412 OnHandshakeDone, this);
413 }
414
OnTimeout()415 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() {
416 grpc_chttp2_transport* transport = nullptr;
417 {
418 MutexLock lock(&connection_->mu_);
419 if (timer_handle_.has_value()) {
420 transport = connection_->transport_;
421 timer_handle_.reset();
422 }
423 }
424 if (transport != nullptr) {
425 grpc_transport_op* op = grpc_make_transport_op(nullptr);
426 op->disconnect_with_error = GRPC_ERROR_CREATE(
427 "Did not receive HTTP/2 settings before handshake timeout");
428 grpc_transport_perform_op(&transport->base, op);
429 }
430 }
431
432 void Chttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void * arg,grpc_error_handle)433 OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
434 HandshakingState* self = static_cast<HandshakingState*>(arg);
435 {
436 MutexLock lock(&self->connection_->mu_);
437 if (self->timer_handle_.has_value()) {
438 self->connection_->event_engine_->Cancel(*self->timer_handle_);
439 self->timer_handle_.reset();
440 }
441 }
442 self->Unref();
443 }
444
OnHandshakeDone(void * arg,grpc_error_handle error)445 void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
446 void* arg, grpc_error_handle error) {
447 auto* args = static_cast<HandshakerArgs*>(arg);
448 HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
449 OrphanablePtr<HandshakingState> handshaking_state_ref;
450 RefCountedPtr<HandshakeManager> handshake_mgr;
451 bool cleanup_connection = false;
452 {
453 MutexLock connection_lock(&self->connection_->mu_);
454 if (!error.ok() || self->connection_->shutdown_) {
455 std::string error_str = StatusToString(error);
456 gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
457 cleanup_connection = true;
458 if (error.ok() && args->endpoint != nullptr) {
459 // We were shut down or stopped serving after handshaking completed
460 // successfully, so destroy the endpoint here.
461 // TODO(ctiller): It is currently necessary to shutdown endpoints
462 // before destroying them, even if we know that there are no
463 // pending read/write callbacks. This should be fixed, at which
464 // point this can be removed.
465 grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
466 grpc_endpoint_destroy(args->endpoint);
467 grpc_slice_buffer_destroy(args->read_buffer);
468 gpr_free(args->read_buffer);
469 }
470 } else {
471 // If the handshaking succeeded but there is no endpoint, then the
472 // handshaker may have handed off the connection to some external
473 // code, so we can just clean up here without creating a transport.
474 if (args->endpoint != nullptr) {
475 grpc_transport* transport =
476 grpc_create_chttp2_transport(args->args, args->endpoint, false);
477 grpc_error_handle channel_init_err =
478 self->connection_->listener_->server_->SetupTransport(
479 transport, self->accepting_pollset_, args->args,
480 grpc_chttp2_transport_get_socket_node(transport));
481 if (channel_init_err.ok()) {
482 // Use notify_on_receive_settings callback to enforce the
483 // handshake deadline.
484 // Note: The reinterpret_cast<>s here are safe, because
485 // grpc_chttp2_transport is a C-style extension of
486 // grpc_transport, so this is morally equivalent of a
487 // static_cast<> to a derived class.
488 // TODO(roth): Change to static_cast<> when we C++-ify the
489 // transport API.
490 self->connection_->transport_ =
491 reinterpret_cast<grpc_chttp2_transport*>(transport);
492 GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
493 "ActiveConnection"); // Held by connection_
494 self->Ref().release(); // Held by OnReceiveSettings().
495 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
496 self, grpc_schedule_on_exec_ctx);
497 // If the listener has been configured with a config fetcher, we
498 // need to watch on the transport being closed so that we can an
499 // updated list of active connections.
500 grpc_closure* on_close = nullptr;
501 if (self->connection_->listener_->config_fetcher_watcher_ !=
502 nullptr) {
503 // Refs helds by OnClose()
504 self->connection_->Ref().release();
505 on_close = &self->connection_->on_close_;
506 } else {
507 // Remove the connection from the connections_ map since OnClose()
508 // will not be invoked when a config fetcher is set.
509 cleanup_connection = true;
510 }
511 grpc_chttp2_transport_start_reading(transport, args->read_buffer,
512 &self->on_receive_settings_,
513 on_close);
514 self->timer_handle_ = self->connection_->event_engine_->RunAfter(
515 self->deadline_ - Timestamp::Now(),
516 [self = self->Ref()]() mutable {
517 ApplicationCallbackExecCtx callback_exec_ctx;
518 ExecCtx exec_ctx;
519 self->OnTimeout();
520 // HandshakingState deletion might require an active ExecCtx.
521 self.reset();
522 });
523 } else {
524 // Failed to create channel from transport. Clean up.
525 gpr_log(GPR_ERROR, "Failed to create channel: %s",
526 StatusToString(channel_init_err).c_str());
527 grpc_transport_destroy(transport);
528 grpc_slice_buffer_destroy(args->read_buffer);
529 gpr_free(args->read_buffer);
530 cleanup_connection = true;
531 }
532 } else {
533 cleanup_connection = true;
534 }
535 }
536 // Since the handshake manager is done, the connection no longer needs to
537 // shutdown the handshake when the listener needs to stop serving.
538 // Avoid calling the destructor of HandshakeManager and HandshakingState
539 // from within the critical region.
540 handshake_mgr = std::move(self->handshake_mgr_);
541 handshaking_state_ref = std::move(self->connection_->handshaking_state_);
542 }
543 gpr_free(self->acceptor_);
544 self->acceptor_ = nullptr;
545 OrphanablePtr<ActiveConnection> connection;
546 if (cleanup_connection) {
547 MutexLock listener_lock(&self->connection_->listener_->mu_);
548 auto it = self->connection_->listener_->connections_.find(
549 self->connection_.get());
550 if (it != self->connection_->listener_->connections_.end()) {
551 connection = std::move(it->second);
552 self->connection_->listener_->connections_.erase(it);
553 }
554 }
555 self->Unref();
556 }
557
558 //
559 // Chttp2ServerListener::ActiveConnection
560 //
561
ActiveConnection(grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor,EventEngine * event_engine,const ChannelArgs & args,MemoryOwner memory_owner)562 Chttp2ServerListener::ActiveConnection::ActiveConnection(
563 grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
564 EventEngine* event_engine, const ChannelArgs& args,
565 MemoryOwner memory_owner)
566 : handshaking_state_(memory_owner.MakeOrphanable<HandshakingState>(
567 Ref(), accepting_pollset, acceptor, args)),
568 event_engine_(event_engine) {
569 GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
570 grpc_schedule_on_exec_ctx);
571 }
572
~ActiveConnection()573 Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
574 if (transport_ != nullptr) {
575 GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
576 }
577 }
578
Orphan()579 void Chttp2ServerListener::ActiveConnection::Orphan() {
580 OrphanablePtr<HandshakingState> handshaking_state;
581 {
582 MutexLock lock(&mu_);
583 shutdown_ = true;
584 // Reset handshaking_state_ since we have been orphaned by the listener
585 // signaling that the listener has stopped serving.
586 handshaking_state = std::move(handshaking_state_);
587 }
588 Unref();
589 }
590
SendGoAway()591 void Chttp2ServerListener::ActiveConnection::SendGoAway() {
592 grpc_chttp2_transport* transport = nullptr;
593 {
594 MutexLock lock(&mu_);
595 if (transport_ != nullptr && !shutdown_) {
596 transport = transport_;
597 drain_grace_timer_handle_ = event_engine_->RunAfter(
598 std::max(Duration::Zero(),
599 listener_->args_
600 .GetDurationFromIntMillis(
601 GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
602 .value_or(Duration::Minutes(10))),
603 [self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
604 ApplicationCallbackExecCtx callback_exec_ctx;
605 ExecCtx exec_ctx;
606 self->OnDrainGraceTimeExpiry();
607 self.reset(DEBUG_LOCATION, "drain_grace_timer");
608 });
609 shutdown_ = true;
610 }
611 }
612 if (transport != nullptr) {
613 grpc_transport_op* op = grpc_make_transport_op(nullptr);
614 op->goaway_error =
615 GRPC_ERROR_CREATE("Server is stopping to serve requests.");
616 grpc_transport_perform_op(&transport->base, op);
617 }
618 }
619
Start(RefCountedPtr<Chttp2ServerListener> listener,grpc_endpoint * endpoint,const ChannelArgs & args)620 void Chttp2ServerListener::ActiveConnection::Start(
621 RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
622 const ChannelArgs& args) {
623 RefCountedPtr<HandshakingState> handshaking_state_ref;
624 listener_ = std::move(listener);
625 {
626 MutexLock lock(&mu_);
627 if (shutdown_) return;
628 // Hold a ref to HandshakingState to allow starting the handshake outside
629 // the critical region.
630 handshaking_state_ref = handshaking_state_->Ref();
631 }
632 handshaking_state_ref->Start(endpoint, args);
633 }
634
OnClose(void * arg,grpc_error_handle)635 void Chttp2ServerListener::ActiveConnection::OnClose(
636 void* arg, grpc_error_handle /* error */) {
637 ActiveConnection* self = static_cast<ActiveConnection*>(arg);
638 OrphanablePtr<ActiveConnection> connection;
639 {
640 MutexLock listener_lock(&self->listener_->mu_);
641 MutexLock connection_lock(&self->mu_);
642 // The node was already deleted from the connections_ list if the
643 // connection is shutdown.
644 if (!self->shutdown_) {
645 auto it = self->listener_->connections_.find(self);
646 if (it != self->listener_->connections_.end()) {
647 connection = std::move(it->second);
648 self->listener_->connections_.erase(it);
649 }
650 self->shutdown_ = true;
651 }
652 // Cancel the drain_grace_timer_ if needed.
653 if (self->drain_grace_timer_handle_.has_value()) {
654 self->event_engine_->Cancel(*self->drain_grace_timer_handle_);
655 self->drain_grace_timer_handle_.reset();
656 }
657 }
658 self->Unref();
659 }
660
OnDrainGraceTimeExpiry()661 void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
662 grpc_chttp2_transport* transport = nullptr;
663 // If the drain_grace_timer_ was not cancelled, disconnect the transport
664 // immediately.
665 {
666 MutexLock lock(&mu_);
667 if (drain_grace_timer_handle_.has_value()) {
668 transport = transport_;
669 drain_grace_timer_handle_.reset();
670 }
671 }
672 if (transport != nullptr) {
673 grpc_transport_op* op = grpc_make_transport_op(nullptr);
674 op->disconnect_with_error = GRPC_ERROR_CREATE(
675 "Drain grace time expired. Closing connection immediately.");
676 grpc_transport_perform_op(&transport->base, op);
677 }
678 }
679
680 //
681 // Chttp2ServerListener
682 //
683
Create(Server * server,grpc_resolved_address * addr,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier,int * port_num)684 grpc_error_handle Chttp2ServerListener::Create(
685 Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
686 Chttp2ServerArgsModifier args_modifier, int* port_num) {
687 Chttp2ServerListener* listener = nullptr;
688 // The bulk of this method is inside of a lambda to make cleanup
689 // easier without using goto.
690 grpc_error_handle error = [&]() {
691 grpc_error_handle error;
692 // Create Chttp2ServerListener.
693 listener = new Chttp2ServerListener(server, args, args_modifier);
694 error = grpc_tcp_server_create(
695 &listener->tcp_server_shutdown_complete_,
696 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
697 OnAccept, listener, &listener->tcp_server_);
698 if (!error.ok()) return error;
699 if (server->config_fetcher() != nullptr) {
700 listener->resolved_address_ = *addr;
701 // TODO(yashykt): Consider binding so as to be able to return the port
702 // number.
703 } else {
704 error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
705 if (!error.ok()) return error;
706 }
707 // Create channelz node.
708 if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
709 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
710 auto string_address = grpc_sockaddr_to_uri(addr);
711 if (!string_address.ok()) {
712 return GRPC_ERROR_CREATE(string_address.status().ToString());
713 }
714 listener->channelz_listen_socket_ =
715 MakeRefCounted<channelz::ListenSocketNode>(
716 *string_address,
717 absl::StrCat("chttp2 listener ", *string_address));
718 }
719 // Register with the server only upon success
720 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
721 return absl::OkStatus();
722 }();
723 if (!error.ok()) {
724 if (listener != nullptr) {
725 if (listener->tcp_server_ != nullptr) {
726 // listener is deleted when tcp_server_ is shutdown.
727 grpc_tcp_server_unref(listener->tcp_server_);
728 } else {
729 delete listener;
730 }
731 }
732 }
733 return error;
734 }
735
CreateWithAcceptor(Server * server,const char * name,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier)736 grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
737 Server* server, const char* name, const ChannelArgs& args,
738 Chttp2ServerArgsModifier args_modifier) {
739 Chttp2ServerListener* listener =
740 new Chttp2ServerListener(server, args, args_modifier);
741 grpc_error_handle error = grpc_tcp_server_create(
742 &listener->tcp_server_shutdown_complete_,
743 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
744 OnAccept, listener, &listener->tcp_server_);
745 if (!error.ok()) {
746 delete listener;
747 return error;
748 }
749 // TODO(yangg) channelz
750 TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name);
751 *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
752 server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
753 return absl::OkStatus();
754 }
755
Chttp2ServerListener(Server * server,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier)756 Chttp2ServerListener::Chttp2ServerListener(
757 Server* server, const ChannelArgs& args,
758 Chttp2ServerArgsModifier args_modifier)
759 : server_(server),
760 args_modifier_(args_modifier),
761 args_(args),
762 memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()) {
763 GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
764 this, grpc_schedule_on_exec_ctx);
765 }
766
~Chttp2ServerListener()767 Chttp2ServerListener::~Chttp2ServerListener() {
768 // Flush queued work before destroying handshaker factory, since that
769 // may do a synchronous unref.
770 ExecCtx::Get()->Flush();
771 if (on_destroy_done_ != nullptr) {
772 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus());
773 ExecCtx::Get()->Flush();
774 }
775 }
776
777 // Server callback: start listening on our ports
Start(Server *,const std::vector<grpc_pollset * > *)778 void Chttp2ServerListener::Start(
779 Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
780 if (server_->config_fetcher() != nullptr) {
781 auto watcher = std::make_unique<ConfigFetcherWatcher>(Ref());
782 config_fetcher_watcher_ = watcher.get();
783 server_->config_fetcher()->StartWatch(
784 grpc_sockaddr_to_string(&resolved_address_, false).value(),
785 std::move(watcher));
786 } else {
787 {
788 MutexLock lock(&mu_);
789 started_ = true;
790 is_serving_ = true;
791 }
792 StartListening();
793 }
794 }
795
StartListening()796 void Chttp2ServerListener::StartListening() {
797 grpc_tcp_server_start(tcp_server_, &server_->pollsets());
798 }
799
SetOnDestroyDone(grpc_closure * on_destroy_done)800 void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
801 MutexLock lock(&mu_);
802 on_destroy_done_ = on_destroy_done;
803 }
804
OnAccept(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)805 void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
806 grpc_pollset* accepting_pollset,
807 grpc_tcp_server_acceptor* acceptor) {
808 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
809 ChannelArgs args = self->args_;
810 RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
811 connection_manager;
812 {
813 MutexLock lock(&self->mu_);
814 connection_manager = self->connection_manager_;
815 }
816 auto endpoint_cleanup = [&](grpc_error_handle error) {
817 grpc_endpoint_shutdown(tcp, error);
818 grpc_endpoint_destroy(tcp);
819 gpr_free(acceptor);
820 };
821 if (self->server_->config_fetcher() != nullptr) {
822 if (connection_manager == nullptr) {
823 grpc_error_handle error = GRPC_ERROR_CREATE(
824 "No ConnectionManager configured. Closing connection.");
825 endpoint_cleanup(error);
826 return;
827 }
828 absl::StatusOr<ChannelArgs> args_result =
829 connection_manager->UpdateChannelArgsForConnection(args, tcp);
830 if (!args_result.ok()) {
831 gpr_log(GPR_DEBUG, "Closing connection: %s",
832 args_result.status().ToString().c_str());
833 endpoint_cleanup(GRPC_ERROR_CREATE(args_result.status().ToString()));
834 return;
835 }
836 grpc_error_handle error;
837 args = self->args_modifier_(*args_result, &error);
838 if (!error.ok()) {
839 gpr_log(GPR_DEBUG, "Closing connection: %s",
840 StatusToString(error).c_str());
841 endpoint_cleanup(error);
842 return;
843 }
844 }
845 auto memory_owner = self->memory_quota_->CreateMemoryOwner(
846 absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel"));
847 EventEngine* const event_engine = self->args_.GetObject<EventEngine>();
848 auto connection = memory_owner.MakeOrphanable<ActiveConnection>(
849 accepting_pollset, acceptor, event_engine, args, std::move(memory_owner));
850 // We no longer own acceptor
851 acceptor = nullptr;
852 // Hold a ref to connection to allow starting handshake outside the
853 // critical region
854 RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
855 RefCountedPtr<Chttp2ServerListener> listener_ref;
856 {
857 MutexLock lock(&self->mu_);
858 // Shutdown the the connection if listener's stopped serving or if the
859 // connection manager has changed.
860 if (!self->shutdown_ && self->is_serving_ &&
861 connection_manager == self->connection_manager_) {
862 // This ref needs to be taken in the critical region after having made
863 // sure that the listener has not been Orphaned, so as to avoid
864 // heap-use-after-free issues where `Ref()` is invoked when the ref of
865 // tcp_server_ has already reached 0. (Ref() implementation of
866 // Chttp2ServerListener is grpc_tcp_server_ref().)
867 listener_ref = self->Ref();
868 self->connections_.emplace(connection.get(), std::move(connection));
869 }
870 }
871 if (connection != nullptr) {
872 endpoint_cleanup(absl::OkStatus());
873 } else {
874 connection_ref->Start(std::move(listener_ref), tcp, args);
875 }
876 }
877
TcpServerShutdownComplete(void * arg,grpc_error_handle)878 void Chttp2ServerListener::TcpServerShutdownComplete(
879 void* arg, grpc_error_handle /*error*/) {
880 Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
881 self->channelz_listen_socket_.reset();
882 delete self;
883 }
884
885 // Server callback: destroy the tcp listener (so we don't generate further
886 // callbacks)
Orphan()887 void Chttp2ServerListener::Orphan() {
888 // Cancel the watch before shutting down so as to avoid holding a ref to the
889 // listener in the watcher.
890 if (config_fetcher_watcher_ != nullptr) {
891 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
892 }
893 std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
894 grpc_tcp_server* tcp_server;
895 {
896 MutexLock lock(&mu_);
897 shutdown_ = true;
898 is_serving_ = false;
899 // Orphan the connections so that they can start cleaning up.
900 connections = std::move(connections_);
901 // If the listener is currently set to be serving but has not been started
902 // yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
903 // operation to finish to avoid causing races.
904 while (is_serving_ && !started_) {
905 started_cv_.Wait(&mu_);
906 }
907 tcp_server = tcp_server_;
908 }
909 grpc_tcp_server_shutdown_listeners(tcp_server);
910 grpc_tcp_server_unref(tcp_server);
911 }
912
913 } // namespace
914
915 //
916 // Chttp2ServerAddPort()
917 //
918
Chttp2ServerAddPort(Server * server,const char * addr,const ChannelArgs & args,Chttp2ServerArgsModifier args_modifier,int * port_num)919 grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
920 const ChannelArgs& args,
921 Chttp2ServerArgsModifier args_modifier,
922 int* port_num) {
923 if (addr == nullptr) {
924 return GRPC_ERROR_CREATE("Invalid address: addr cannot be a nullptr.");
925 }
926 if (strncmp(addr, "external:", 9) == 0) {
927 return Chttp2ServerListener::CreateWithAcceptor(server, addr, args,
928 args_modifier);
929 }
930 *port_num = -1;
931 absl::StatusOr<std::vector<grpc_resolved_address>> resolved_or;
932 std::vector<grpc_error_handle> error_list;
933 std::string parsed_addr = URI::PercentDecode(addr);
934 absl::string_view parsed_addr_unprefixed{parsed_addr};
935 // Using lambda to avoid use of goto.
936 grpc_error_handle error = [&]() {
937 grpc_error_handle error;
938 if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix)) {
939 resolved_or = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
940 } else if (absl::ConsumePrefix(&parsed_addr_unprefixed,
941 kUnixAbstractUriPrefix)) {
942 resolved_or =
943 grpc_resolve_unix_abstract_domain_address(parsed_addr_unprefixed);
944 } else {
945 resolved_or =
946 GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
947 }
948 if (!resolved_or.ok()) {
949 return absl_status_to_grpc_error(resolved_or.status());
950 }
951 // Create a listener for each resolved address.
952 for (auto& addr : *resolved_or) {
953 // If address has a wildcard port (0), use the same port as a previous
954 // listener.
955 if (*port_num != -1 && grpc_sockaddr_get_port(&addr) == 0) {
956 grpc_sockaddr_set_port(&addr, *port_num);
957 }
958 int port_temp = -1;
959 error = Chttp2ServerListener::Create(server, &addr, args, args_modifier,
960 &port_temp);
961 if (!error.ok()) {
962 error_list.push_back(error);
963 } else {
964 if (*port_num == -1) {
965 *port_num = port_temp;
966 } else {
967 GPR_ASSERT(*port_num == port_temp);
968 }
969 }
970 }
971 if (error_list.size() == resolved_or->size()) {
972 std::string msg = absl::StrFormat(
973 "No address added out of total %" PRIuPTR " resolved for '%s'",
974 resolved_or->size(), addr);
975 return GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
976 error_list.size());
977 } else if (!error_list.empty()) {
978 std::string msg = absl::StrFormat(
979 "Only %" PRIuPTR " addresses added out of total %" PRIuPTR
980 " resolved",
981 resolved_or->size() - error_list.size(), resolved_or->size());
982 error = GRPC_ERROR_CREATE_REFERENCING(msg.c_str(), error_list.data(),
983 error_list.size());
984 gpr_log(GPR_INFO, "WARNING: %s", StatusToString(error).c_str());
985 // we managed to bind some addresses: continue without error
986 }
987 return absl::OkStatus();
988 }(); // lambda end
989 if (!error.ok()) *port_num = 0;
990 return error;
991 }
992
993 namespace {
994
ModifyArgsForConnection(const ChannelArgs & args,grpc_error_handle * error)995 ChannelArgs ModifyArgsForConnection(const ChannelArgs& args,
996 grpc_error_handle* error) {
997 auto* server_credentials = args.GetObject<grpc_server_credentials>();
998 if (server_credentials == nullptr) {
999 *error = GRPC_ERROR_CREATE("Could not find server credentials");
1000 return args;
1001 }
1002 auto security_connector = server_credentials->create_security_connector(args);
1003 if (security_connector == nullptr) {
1004 *error = GRPC_ERROR_CREATE(
1005 absl::StrCat("Unable to create secure server with credentials of type ",
1006 server_credentials->type().name()));
1007 return args;
1008 }
1009 return args.SetObject(security_connector);
1010 }
1011
1012 } // namespace
1013 } // namespace grpc_core
1014
grpc_server_add_http2_port(grpc_server * server,const char * addr,grpc_server_credentials * creds)1015 int grpc_server_add_http2_port(grpc_server* server, const char* addr,
1016 grpc_server_credentials* creds) {
1017 grpc_core::ExecCtx exec_ctx;
1018 grpc_error_handle err;
1019 grpc_core::RefCountedPtr<grpc_server_security_connector> sc;
1020 int port_num = 0;
1021 grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1022 grpc_core::ChannelArgs args = core_server->channel_args();
1023 GRPC_API_TRACE("grpc_server_add_http2_port(server=%p, addr=%s, creds=%p)", 3,
1024 (server, addr, creds));
1025 // Create security context.
1026 if (creds == nullptr) {
1027 err = GRPC_ERROR_CREATE(
1028 "No credentials specified for secure server port (creds==NULL)");
1029 goto done;
1030 }
1031 // TODO(yashykt): Ideally, we would not want to have different behavior here
1032 // based on whether a config fetcher is configured or not. Currently, we have
1033 // a feature for SSL credentials reloading with an application callback that
1034 // assumes that there is a single security connector. If we delay the creation
1035 // of the security connector to after the creation of the listener(s), we
1036 // would have potentially multiple security connectors which breaks the
1037 // assumption for SSL creds reloading. When the API for SSL creds reloading is
1038 // rewritten, we would be able to make this workaround go away by removing
1039 // that assumption. As an immediate drawback of this workaround, config
1040 // fetchers need to be registered before adding ports to the server.
1041 if (core_server->config_fetcher() != nullptr) {
1042 // Create channel args.
1043 args = args.SetObject(creds->Ref());
1044 } else {
1045 sc = creds->create_security_connector(grpc_core::ChannelArgs());
1046 if (sc == nullptr) {
1047 err = GRPC_ERROR_CREATE(absl::StrCat(
1048 "Unable to create secure server with credentials of type ",
1049 creds->type().name()));
1050 goto done;
1051 }
1052 args = args.SetObject(creds->Ref()).SetObject(sc);
1053 }
1054 // Add server port.
1055 err = grpc_core::Chttp2ServerAddPort(
1056 core_server, addr, args, grpc_core::ModifyArgsForConnection, &port_num);
1057 done:
1058 sc.reset(DEBUG_LOCATION, "server");
1059 if (!err.ok()) {
1060 gpr_log(GPR_ERROR, "%s", grpc_core::StatusToString(err).c_str());
1061 }
1062 return port_num;
1063 }
1064
1065 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
grpc_server_add_channel_from_fd(grpc_server * server,int fd,grpc_server_credentials * creds)1066 void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
1067 grpc_server_credentials* creds) {
1068 // For now, we only support insecure server credentials
1069 if (creds == nullptr ||
1070 creds->type() != grpc_core::InsecureServerCredentials::Type()) {
1071 gpr_log(GPR_ERROR, "Failed to create channel due to invalid creds");
1072 return;
1073 }
1074 grpc_core::ExecCtx exec_ctx;
1075 grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1076
1077 grpc_core::ChannelArgs server_args = core_server->channel_args();
1078 std::string name = absl::StrCat("fd:", fd);
1079 auto memory_quota =
1080 server_args.GetObject<grpc_core::ResourceQuota>()->memory_quota();
1081 grpc_endpoint* server_endpoint = grpc_tcp_create_from_fd(
1082 grpc_fd_create(fd, name.c_str(), true),
1083 grpc_event_engine::experimental::ChannelArgsEndpointConfig(server_args),
1084 name);
1085 grpc_transport* transport = grpc_create_chttp2_transport(
1086 server_args, server_endpoint, false // is_client
1087 );
1088 grpc_error_handle error =
1089 core_server->SetupTransport(transport, nullptr, server_args, nullptr);
1090 if (error.ok()) {
1091 for (grpc_pollset* pollset : core_server->pollsets()) {
1092 grpc_endpoint_add_to_pollset(server_endpoint, pollset);
1093 }
1094 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
1095 } else {
1096 gpr_log(GPR_ERROR, "Failed to create channel: %s",
1097 grpc_core::StatusToString(error).c_str());
1098 grpc_transport_destroy(transport);
1099 }
1100 }
1101
1102 #else // !GPR_SUPPORT_CHANNELS_FROM_FD
1103
grpc_server_add_channel_from_fd(grpc_server *,int,grpc_server_credentials *)1104 void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */,
1105 grpc_server_credentials* /* creds */) {
1106 GPR_ASSERT(0);
1107 }
1108
1109 #endif // GPR_SUPPORT_CHANNELS_FROM_FD
1110