1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22 
23 #include <stdint.h>
24 
25 #include <initializer_list>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_format.h"
33 
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_posix.h>
36 #include <grpc/grpc_security.h>
37 #include <grpc/slice_buffer.h>
38 #include <grpc/status.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/sync.h>
42 
43 #include "src/core/ext/filters/client_channel/client_channel.h"
44 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
45 #include "src/core/ext/filters/client_channel/connector.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
48 #include "src/core/lib/address_utils/sockaddr_utils.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_args_preconditioning.h"
51 #include "src/core/lib/channel/channelz.h"
52 #include "src/core/lib/config/core_configuration.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
55 #include "src/core/lib/gprpp/debug_location.h"
56 #include "src/core/lib/gprpp/orphanable.h"
57 #include "src/core/lib/gprpp/status_helper.h"
58 #include "src/core/lib/gprpp/time.h"
59 #include "src/core/lib/gprpp/unique_type_name.h"
60 #include "src/core/lib/iomgr/endpoint.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/resolved_address.h"
63 #include "src/core/lib/resolver/resolver_registry.h"
64 #include "src/core/lib/security/credentials/credentials.h"
65 #include "src/core/lib/security/credentials/insecure/insecure_credentials.h"
66 #include "src/core/lib/security/security_connector/security_connector.h"
67 #include "src/core/lib/surface/api_trace.h"
68 #include "src/core/lib/surface/channel.h"
69 #include "src/core/lib/surface/channel_stack_type.h"
70 #include "src/core/lib/transport/error_utils.h"
71 #include "src/core/lib/transport/handshaker.h"
72 #include "src/core/lib/transport/handshaker_registry.h"
73 #include "src/core/lib/transport/tcp_connect_handshaker.h"
74 #include "src/core/lib/transport/transport.h"
75 #include "src/core/lib/transport/transport_fwd.h"
76 
77 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
78 
79 #include <fcntl.h>
80 
81 #include "src/core/lib/iomgr/ev_posix.h"
82 #include "src/core/lib/iomgr/tcp_client_posix.h"
83 
84 #endif  // GPR_SUPPORT_CHANNELS_FROM_FD
85 
86 namespace grpc_core {
87 
88 using ::grpc_event_engine::experimental::EventEngine;
89 
90 namespace {
NullThenSchedClosure(const DebugLocation & location,grpc_closure ** closure,grpc_error_handle error)91 void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
92                           grpc_error_handle error) {
93   grpc_closure* c = *closure;
94   *closure = nullptr;
95   ExecCtx::Run(location, c, error);
96 }
97 }  // namespace
98 
~Chttp2Connector()99 Chttp2Connector::~Chttp2Connector() {
100   if (endpoint_ != nullptr) {
101     grpc_endpoint_destroy(endpoint_);
102   }
103 }
104 
Connect(const Args & args,Result * result,grpc_closure * notify)105 void Chttp2Connector::Connect(const Args& args, Result* result,
106                               grpc_closure* notify) {
107   {
108     MutexLock lock(&mu_);
109     GPR_ASSERT(notify_ == nullptr);
110     args_ = args;
111     result_ = result;
112     notify_ = notify;
113     GPR_ASSERT(endpoint_ == nullptr);
114     event_engine_ = args_.channel_args.GetObject<EventEngine>();
115   }
116   absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
117   if (!address.ok()) {
118     grpc_error_handle error = GRPC_ERROR_CREATE(address.status().ToString());
119     NullThenSchedClosure(DEBUG_LOCATION, &notify_, error);
120     return;
121   }
122   ChannelArgs channel_args =
123       args_.channel_args
124           .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value())
125           .Set(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET, 1);
126   handshake_mgr_ = MakeRefCounted<HandshakeManager>();
127   CoreConfiguration::Get().handshaker_registry().AddHandshakers(
128       HANDSHAKER_CLIENT, channel_args, args_.interested_parties,
129       handshake_mgr_.get());
130   Ref().release();  // Ref held by OnHandshakeDone().
131   handshake_mgr_->DoHandshake(nullptr /* endpoint */, channel_args,
132                               args.deadline, nullptr /* acceptor */,
133                               OnHandshakeDone, this);
134 }
135 
Shutdown(grpc_error_handle error)136 void Chttp2Connector::Shutdown(grpc_error_handle error) {
137   MutexLock lock(&mu_);
138   shutdown_ = true;
139   if (handshake_mgr_ != nullptr) {
140     // Handshaker will also shutdown the endpoint if it exists
141     handshake_mgr_->Shutdown(error);
142   }
143 }
144 
OnHandshakeDone(void * arg,grpc_error_handle error)145 void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
146   auto* args = static_cast<HandshakerArgs*>(arg);
147   Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
148   {
149     MutexLock lock(&self->mu_);
150     if (!error.ok() || self->shutdown_) {
151       if (error.ok()) {
152         error = GRPC_ERROR_CREATE("connector shutdown");
153         // We were shut down after handshaking completed successfully, so
154         // destroy the endpoint here.
155         if (args->endpoint != nullptr) {
156           // TODO(ctiller): It is currently necessary to shutdown endpoints
157           // before destroying them, even if we know that there are no
158           // pending read/write callbacks.  This should be fixed, at which
159           // point this can be removed.
160           grpc_endpoint_shutdown(args->endpoint, error);
161           grpc_endpoint_destroy(args->endpoint);
162           grpc_slice_buffer_destroy(args->read_buffer);
163           gpr_free(args->read_buffer);
164         }
165       }
166       self->result_->Reset();
167       NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
168     } else if (args->endpoint != nullptr) {
169       self->result_->transport =
170           grpc_create_chttp2_transport(args->args, args->endpoint, true);
171       GPR_ASSERT(self->result_->transport != nullptr);
172       self->result_->socket_node =
173           grpc_chttp2_transport_get_socket_node(self->result_->transport);
174       self->result_->channel_args = args->args;
175       self->endpoint_ = args->endpoint;
176       self->Ref().release();  // Ref held by OnReceiveSettings()
177       GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
178                         grpc_schedule_on_exec_ctx);
179       grpc_chttp2_transport_start_reading(self->result_->transport,
180                                           args->read_buffer,
181                                           &self->on_receive_settings_, nullptr);
182       RefCountedPtr<Chttp2Connector> cc = self->Ref();
183       self->timer_handle_ = self->event_engine_->RunAfter(
184           self->args_.deadline - Timestamp::Now(), [self = std::move(cc)] {
185             ApplicationCallbackExecCtx callback_exec_ctx;
186             ExecCtx exec_ctx;
187             self->OnTimeout();
188           });
189     } else {
190       // If the handshaking succeeded but there is no endpoint, then the
191       // handshaker may have handed off the connection to some external
192       // code. Just verify that exit_early flag is set.
193       GPR_DEBUG_ASSERT(args->exit_early);
194       NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
195     }
196     self->handshake_mgr_.reset();
197   }
198   self->Unref();
199 }
200 
OnReceiveSettings(void * arg,grpc_error_handle error)201 void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
202   Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
203   {
204     MutexLock lock(&self->mu_);
205     if (!self->notify_error_.has_value()) {
206       grpc_endpoint_delete_from_pollset_set(self->endpoint_,
207                                             self->args_.interested_parties);
208       if (!error.ok()) {
209         // Transport got an error while waiting on SETTINGS frame.
210         self->result_->Reset();
211       }
212       self->MaybeNotify(error);
213       if (self->timer_handle_.has_value()) {
214         if (self->event_engine_->Cancel(*self->timer_handle_)) {
215           // If we have cancelled the timer successfully, call Notify() again
216           // since the timer callback will not be called now.
217           self->MaybeNotify(absl::OkStatus());
218         }
219         self->timer_handle_.reset();
220       }
221     } else {
222       // OnTimeout() was already invoked. Call Notify() again so that notify_
223       // can be invoked.
224       self->MaybeNotify(absl::OkStatus());
225     }
226   }
227   self->Unref();
228 }
229 
OnTimeout()230 void Chttp2Connector::OnTimeout() {
231   MutexLock lock(&mu_);
232   timer_handle_.reset();
233   if (!notify_error_.has_value()) {
234     // The transport did not receive the settings frame in time. Destroy the
235     // transport.
236     grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
237     result_->Reset();
238     MaybeNotify(GRPC_ERROR_CREATE(
239         "connection attempt timed out before receiving SETTINGS frame"));
240   } else {
241     // OnReceiveSettings() was already invoked. Call Notify() again so that
242     // notify_ can be invoked.
243     MaybeNotify(absl::OkStatus());
244   }
245 }
246 
MaybeNotify(grpc_error_handle error)247 void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
248   if (notify_error_.has_value()) {
249     NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
250     // Clear state for a new Connect().
251     // Clear out the endpoint_, since it is the responsibility of
252     // the transport to shut it down.
253     endpoint_ = nullptr;
254     notify_error_.reset();
255   } else {
256     notify_error_ = error;
257   }
258 }
259 
260 namespace {
261 
262 class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
263  public:
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & args)264   RefCountedPtr<Subchannel> CreateSubchannel(
265       const grpc_resolved_address& address, const ChannelArgs& args) override {
266     absl::StatusOr<ChannelArgs> new_args = GetSecureNamingChannelArgs(args);
267     if (!new_args.ok()) {
268       gpr_log(GPR_ERROR,
269               "Failed to create channel args during subchannel creation: %s; "
270               "Got args: %s",
271               new_args.status().ToString().c_str(), args.ToString().c_str());
272       return nullptr;
273     }
274     RefCountedPtr<Subchannel> s = Subchannel::Create(
275         MakeOrphanable<Chttp2Connector>(), address, *new_args);
276     return s;
277   }
278 
279  private:
GetSecureNamingChannelArgs(ChannelArgs args)280   static absl::StatusOr<ChannelArgs> GetSecureNamingChannelArgs(
281       ChannelArgs args) {
282     auto* channel_credentials = args.GetObject<grpc_channel_credentials>();
283     if (channel_credentials == nullptr) {
284       return absl::InternalError(
285           "channel credentials missing for secure channel");
286     }
287     // Make sure security connector does not already exist in args.
288     if (args.Contains(GRPC_ARG_SECURITY_CONNECTOR)) {
289       return absl::InternalError(
290           "security connector already present in channel args.");
291     }
292     // Find the authority to use in the security connector.
293     absl::optional<std::string> authority =
294         args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
295     if (!authority.has_value()) {
296       return absl::InternalError("authority not present in channel args");
297     }
298     // Create the security connector using the credentials and target name.
299     RefCountedPtr<grpc_channel_security_connector>
300         subchannel_security_connector =
301             channel_credentials->create_security_connector(
302                 /*call_creds=*/nullptr, authority->c_str(), &args);
303     if (subchannel_security_connector == nullptr) {
304       return absl::InternalError(absl::StrFormat(
305           "Failed to create secure subchannel for secure name '%s'",
306           *authority));
307     }
308     return args.SetObject(std::move(subchannel_security_connector));
309   }
310 };
311 
CreateChannel(const char * target,const ChannelArgs & args)312 absl::StatusOr<RefCountedPtr<Channel>> CreateChannel(const char* target,
313                                                      const ChannelArgs& args) {
314   if (target == nullptr) {
315     gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
316     return absl::InvalidArgumentError("channel target is NULL");
317   }
318   // Add channel arg containing the server URI.
319   std::string canonical_target =
320       CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
321           target);
322   return Channel::Create(target,
323                          args.Set(GRPC_ARG_SERVER_URI, canonical_target),
324                          GRPC_CLIENT_CHANNEL, nullptr);
325 }
326 
327 }  // namespace
328 }  // namespace grpc_core
329 
330 namespace {
331 
332 grpc_core::Chttp2SecureClientChannelFactory* g_factory;
333 gpr_once g_factory_once = GPR_ONCE_INIT;
334 
FactoryInit()335 void FactoryInit() {
336   g_factory = new grpc_core::Chttp2SecureClientChannelFactory();
337 }
338 
339 }  // namespace
340 
341 // Create a secure client channel:
342 //   Asynchronously: - resolve target
343 //                   - connect to it (trying alternatives as presented)
344 //                   - perform handshakes
grpc_channel_create(const char * target,grpc_channel_credentials * creds,const grpc_channel_args * c_args)345 grpc_channel* grpc_channel_create(const char* target,
346                                   grpc_channel_credentials* creds,
347                                   const grpc_channel_args* c_args) {
348   grpc_core::ExecCtx exec_ctx;
349   GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3,
350                  (target, (void*)creds, (void*)c_args));
351   grpc_channel* channel = nullptr;
352   grpc_error_handle error;
353   if (creds != nullptr) {
354     // Add channel args containing the client channel factory and channel
355     // credentials.
356     gpr_once_init(&g_factory_once, FactoryInit);
357     grpc_core::ChannelArgs args =
358         creds->update_arguments(grpc_core::CoreConfiguration::Get()
359                                     .channel_args_preconditioning()
360                                     .PreconditionChannelArgs(c_args)
361                                     .SetObject(creds->Ref())
362                                     .SetObject(g_factory));
363     // Create channel.
364     auto r = grpc_core::CreateChannel(target, args);
365     if (r.ok()) {
366       channel = r->release()->c_ptr();
367     } else {
368       error = absl_status_to_grpc_error(r.status());
369     }
370   }
371   if (channel == nullptr) {
372     intptr_t integer;
373     grpc_status_code status = GRPC_STATUS_INTERNAL;
374     if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
375                            &integer)) {
376       status = static_cast<grpc_status_code>(integer);
377     }
378     channel = grpc_lame_client_channel_create(
379         target, status, "Failed to create secure client channel");
380   }
381   return channel;
382 }
383 
384 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
grpc_channel_create_from_fd(const char * target,int fd,grpc_channel_credentials * creds,const grpc_channel_args * args)385 grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
386                                           grpc_channel_credentials* creds,
387                                           const grpc_channel_args* args) {
388   grpc_core::ExecCtx exec_ctx;
389   GRPC_API_TRACE(
390       "grpc_channel_create_from_fd(target=%p, fd=%d, creds=%p, args=%p)", 4,
391       (target, fd, creds, args));
392   // For now, we only support insecure channel credentials.
393   if (creds == nullptr ||
394       creds->type() != grpc_core::InsecureCredentials::Type()) {
395     return grpc_lame_client_channel_create(
396         target, GRPC_STATUS_INTERNAL,
397         "Failed to create client channel due to invalid creds");
398   }
399   grpc_core::ChannelArgs final_args =
400       grpc_core::CoreConfiguration::Get()
401           .channel_args_preconditioning()
402           .PreconditionChannelArgs(args)
403           .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority")
404           .SetObject(creds->Ref());
405 
406   int flags = fcntl(fd, F_GETFL, 0);
407   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
408   grpc_endpoint* client = grpc_tcp_create_from_fd(
409       grpc_fd_create(fd, "client", true),
410       grpc_event_engine::experimental::ChannelArgsEndpointConfig(final_args),
411       "fd-client");
412   grpc_transport* transport =
413       grpc_create_chttp2_transport(final_args, client, true);
414   GPR_ASSERT(transport);
415   auto channel = grpc_core::Channel::Create(
416       target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
417   if (channel.ok()) {
418     grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
419     grpc_core::ExecCtx::Get()->Flush();
420     return channel->release()->c_ptr();
421   } else {
422     grpc_transport_destroy(transport);
423     return grpc_lame_client_channel_create(
424         target, static_cast<grpc_status_code>(channel.status().code()),
425         "Failed to create client channel");
426   }
427 }
428 
429 #else  // !GPR_SUPPORT_CHANNELS_FROM_FD
430 
grpc_channel_create_from_fd(const char *,int,grpc_channel_credentials *,const grpc_channel_args *)431 grpc_channel* grpc_channel_create_from_fd(const char* /* target */,
432                                           int /* fd */,
433                                           grpc_channel_credentials* /* creds*/,
434                                           const grpc_channel_args* /* args */) {
435   GPR_ASSERT(0);
436   return nullptr;
437 }
438 
439 #endif  // GPR_SUPPORT_CHANNELS_FROM_FD
440