1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
22
23 #include <stdint.h>
24
25 #include <initializer_list>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_format.h"
33
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_posix.h>
36 #include <grpc/grpc_security.h>
37 #include <grpc/slice_buffer.h>
38 #include <grpc/status.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/sync.h>
42
43 #include "src/core/ext/filters/client_channel/client_channel.h"
44 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
45 #include "src/core/ext/filters/client_channel/connector.h"
46 #include "src/core/ext/filters/client_channel/subchannel.h"
47 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
48 #include "src/core/lib/address_utils/sockaddr_utils.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_args_preconditioning.h"
51 #include "src/core/lib/channel/channelz.h"
52 #include "src/core/lib/config/core_configuration.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
55 #include "src/core/lib/gprpp/debug_location.h"
56 #include "src/core/lib/gprpp/orphanable.h"
57 #include "src/core/lib/gprpp/status_helper.h"
58 #include "src/core/lib/gprpp/time.h"
59 #include "src/core/lib/gprpp/unique_type_name.h"
60 #include "src/core/lib/iomgr/endpoint.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/resolved_address.h"
63 #include "src/core/lib/resolver/resolver_registry.h"
64 #include "src/core/lib/security/credentials/credentials.h"
65 #include "src/core/lib/security/credentials/insecure/insecure_credentials.h"
66 #include "src/core/lib/security/security_connector/security_connector.h"
67 #include "src/core/lib/surface/api_trace.h"
68 #include "src/core/lib/surface/channel.h"
69 #include "src/core/lib/surface/channel_stack_type.h"
70 #include "src/core/lib/transport/error_utils.h"
71 #include "src/core/lib/transport/handshaker.h"
72 #include "src/core/lib/transport/handshaker_registry.h"
73 #include "src/core/lib/transport/tcp_connect_handshaker.h"
74 #include "src/core/lib/transport/transport.h"
75 #include "src/core/lib/transport/transport_fwd.h"
76
77 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
78
79 #include <fcntl.h>
80
81 #include "src/core/lib/iomgr/ev_posix.h"
82 #include "src/core/lib/iomgr/tcp_client_posix.h"
83
84 #endif // GPR_SUPPORT_CHANNELS_FROM_FD
85
86 namespace grpc_core {
87
88 using ::grpc_event_engine::experimental::EventEngine;
89
90 namespace {
NullThenSchedClosure(const DebugLocation & location,grpc_closure ** closure,grpc_error_handle error)91 void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
92 grpc_error_handle error) {
93 grpc_closure* c = *closure;
94 *closure = nullptr;
95 ExecCtx::Run(location, c, error);
96 }
97 } // namespace
98
~Chttp2Connector()99 Chttp2Connector::~Chttp2Connector() {
100 if (endpoint_ != nullptr) {
101 grpc_endpoint_destroy(endpoint_);
102 }
103 }
104
Connect(const Args & args,Result * result,grpc_closure * notify)105 void Chttp2Connector::Connect(const Args& args, Result* result,
106 grpc_closure* notify) {
107 {
108 MutexLock lock(&mu_);
109 GPR_ASSERT(notify_ == nullptr);
110 args_ = args;
111 result_ = result;
112 notify_ = notify;
113 GPR_ASSERT(endpoint_ == nullptr);
114 event_engine_ = args_.channel_args.GetObject<EventEngine>();
115 }
116 absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
117 if (!address.ok()) {
118 grpc_error_handle error = GRPC_ERROR_CREATE(address.status().ToString());
119 NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, error);
120 return;
121 }
122 ChannelArgs channel_args =
123 args_.channel_args
124 .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value())
125 .Set(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET, 1);
126 handshake_mgr_ = MakeRefCounted<HandshakeManager>();
127 CoreConfiguration::Get().handshaker_registry().AddHandshakers(
128 HANDSHAKER_CLIENT, channel_args, args_.interested_parties,
129 handshake_mgr_.get());
130 Ref().release(); // Ref held by OnHandshakeDone().
131 handshake_mgr_->DoHandshake(nullptr /* endpoint */, channel_args,
132 args.deadline, nullptr /* acceptor */,
133 OnHandshakeDone, this);
134 }
135
Shutdown(grpc_error_handle error)136 void Chttp2Connector::Shutdown(grpc_error_handle error) {
137 MutexLock lock(&mu_);
138 shutdown_ = true;
139 if (handshake_mgr_ != nullptr) {
140 // Handshaker will also shutdown the endpoint if it exists
141 handshake_mgr_->Shutdown(error);
142 }
143 }
144
OnHandshakeDone(void * arg,grpc_error_handle error)145 void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
146 auto* args = static_cast<HandshakerArgs*>(arg);
147 Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
148 {
149 MutexLock lock(&self->mu_);
150 if (!error.ok() || self->shutdown_) {
151 if (error.ok()) {
152 error = GRPC_ERROR_CREATE("connector shutdown");
153 // We were shut down after handshaking completed successfully, so
154 // destroy the endpoint here.
155 if (args->endpoint != nullptr) {
156 // TODO(ctiller): It is currently necessary to shutdown endpoints
157 // before destroying them, even if we know that there are no
158 // pending read/write callbacks. This should be fixed, at which
159 // point this can be removed.
160 grpc_endpoint_shutdown(args->endpoint, error);
161 grpc_endpoint_destroy(args->endpoint);
162 grpc_slice_buffer_destroy(args->read_buffer);
163 gpr_free(args->read_buffer);
164 }
165 }
166 self->result_->Reset();
167 NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
168 } else if (args->endpoint != nullptr) {
169 self->result_->transport =
170 grpc_create_chttp2_transport(args->args, args->endpoint, true);
171 GPR_ASSERT(self->result_->transport != nullptr);
172 self->result_->socket_node =
173 grpc_chttp2_transport_get_socket_node(self->result_->transport);
174 self->result_->channel_args = args->args;
175 self->endpoint_ = args->endpoint;
176 self->Ref().release(); // Ref held by OnReceiveSettings()
177 GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
178 grpc_schedule_on_exec_ctx);
179 grpc_chttp2_transport_start_reading(self->result_->transport,
180 args->read_buffer,
181 &self->on_receive_settings_, nullptr);
182 RefCountedPtr<Chttp2Connector> cc = self->Ref();
183 self->timer_handle_ = self->event_engine_->RunAfter(
184 self->args_.deadline - Timestamp::Now(), [self = std::move(cc)] {
185 ApplicationCallbackExecCtx callback_exec_ctx;
186 ExecCtx exec_ctx;
187 self->OnTimeout();
188 });
189 } else {
190 // If the handshaking succeeded but there is no endpoint, then the
191 // handshaker may have handed off the connection to some external
192 // code. Just verify that exit_early flag is set.
193 GPR_DEBUG_ASSERT(args->exit_early);
194 NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
195 }
196 self->handshake_mgr_.reset();
197 }
198 self->Unref();
199 }
200
OnReceiveSettings(void * arg,grpc_error_handle error)201 void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
202 Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
203 {
204 MutexLock lock(&self->mu_);
205 if (!self->notify_error_.has_value()) {
206 grpc_endpoint_delete_from_pollset_set(self->endpoint_,
207 self->args_.interested_parties);
208 if (!error.ok()) {
209 // Transport got an error while waiting on SETTINGS frame.
210 self->result_->Reset();
211 }
212 self->MaybeNotify(error);
213 if (self->timer_handle_.has_value()) {
214 if (self->event_engine_->Cancel(*self->timer_handle_)) {
215 // If we have cancelled the timer successfully, call Notify() again
216 // since the timer callback will not be called now.
217 self->MaybeNotify(absl::OkStatus());
218 }
219 self->timer_handle_.reset();
220 }
221 } else {
222 // OnTimeout() was already invoked. Call Notify() again so that notify_
223 // can be invoked.
224 self->MaybeNotify(absl::OkStatus());
225 }
226 }
227 self->Unref();
228 }
229
OnTimeout()230 void Chttp2Connector::OnTimeout() {
231 MutexLock lock(&mu_);
232 timer_handle_.reset();
233 if (!notify_error_.has_value()) {
234 // The transport did not receive the settings frame in time. Destroy the
235 // transport.
236 grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
237 result_->Reset();
238 MaybeNotify(GRPC_ERROR_CREATE(
239 "connection attempt timed out before receiving SETTINGS frame"));
240 } else {
241 // OnReceiveSettings() was already invoked. Call Notify() again so that
242 // notify_ can be invoked.
243 MaybeNotify(absl::OkStatus());
244 }
245 }
246
MaybeNotify(grpc_error_handle error)247 void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
248 if (notify_error_.has_value()) {
249 NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, notify_error_.value());
250 // Clear state for a new Connect().
251 // Clear out the endpoint_, since it is the responsibility of
252 // the transport to shut it down.
253 endpoint_ = nullptr;
254 notify_error_.reset();
255 } else {
256 notify_error_ = error;
257 }
258 }
259
260 namespace {
261
262 class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
263 public:
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & args)264 RefCountedPtr<Subchannel> CreateSubchannel(
265 const grpc_resolved_address& address, const ChannelArgs& args) override {
266 absl::StatusOr<ChannelArgs> new_args = GetSecureNamingChannelArgs(args);
267 if (!new_args.ok()) {
268 gpr_log(GPR_ERROR,
269 "Failed to create channel args during subchannel creation: %s; "
270 "Got args: %s",
271 new_args.status().ToString().c_str(), args.ToString().c_str());
272 return nullptr;
273 }
274 RefCountedPtr<Subchannel> s = Subchannel::Create(
275 MakeOrphanable<Chttp2Connector>(), address, *new_args);
276 return s;
277 }
278
279 private:
GetSecureNamingChannelArgs(ChannelArgs args)280 static absl::StatusOr<ChannelArgs> GetSecureNamingChannelArgs(
281 ChannelArgs args) {
282 auto* channel_credentials = args.GetObject<grpc_channel_credentials>();
283 if (channel_credentials == nullptr) {
284 return absl::InternalError(
285 "channel credentials missing for secure channel");
286 }
287 // Make sure security connector does not already exist in args.
288 if (args.Contains(GRPC_ARG_SECURITY_CONNECTOR)) {
289 return absl::InternalError(
290 "security connector already present in channel args.");
291 }
292 // Find the authority to use in the security connector.
293 absl::optional<std::string> authority =
294 args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
295 if (!authority.has_value()) {
296 return absl::InternalError("authority not present in channel args");
297 }
298 // Create the security connector using the credentials and target name.
299 RefCountedPtr<grpc_channel_security_connector>
300 subchannel_security_connector =
301 channel_credentials->create_security_connector(
302 /*call_creds=*/nullptr, authority->c_str(), &args);
303 if (subchannel_security_connector == nullptr) {
304 return absl::InternalError(absl::StrFormat(
305 "Failed to create secure subchannel for secure name '%s'",
306 *authority));
307 }
308 return args.SetObject(std::move(subchannel_security_connector));
309 }
310 };
311
CreateChannel(const char * target,const ChannelArgs & args)312 absl::StatusOr<RefCountedPtr<Channel>> CreateChannel(const char* target,
313 const ChannelArgs& args) {
314 if (target == nullptr) {
315 gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
316 return absl::InvalidArgumentError("channel target is NULL");
317 }
318 // Add channel arg containing the server URI.
319 std::string canonical_target =
320 CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
321 target);
322 return Channel::Create(target,
323 args.Set(GRPC_ARG_SERVER_URI, canonical_target),
324 GRPC_CLIENT_CHANNEL, nullptr);
325 }
326
327 } // namespace
328 } // namespace grpc_core
329
330 namespace {
331
332 grpc_core::Chttp2SecureClientChannelFactory* g_factory;
333 gpr_once g_factory_once = GPR_ONCE_INIT;
334
FactoryInit()335 void FactoryInit() {
336 g_factory = new grpc_core::Chttp2SecureClientChannelFactory();
337 }
338
339 } // namespace
340
341 // Create a secure client channel:
342 // Asynchronously: - resolve target
343 // - connect to it (trying alternatives as presented)
344 // - perform handshakes
grpc_channel_create(const char * target,grpc_channel_credentials * creds,const grpc_channel_args * c_args)345 grpc_channel* grpc_channel_create(const char* target,
346 grpc_channel_credentials* creds,
347 const grpc_channel_args* c_args) {
348 grpc_core::ExecCtx exec_ctx;
349 GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3,
350 (target, (void*)creds, (void*)c_args));
351 grpc_channel* channel = nullptr;
352 grpc_error_handle error;
353 if (creds != nullptr) {
354 // Add channel args containing the client channel factory and channel
355 // credentials.
356 gpr_once_init(&g_factory_once, FactoryInit);
357 grpc_core::ChannelArgs args =
358 creds->update_arguments(grpc_core::CoreConfiguration::Get()
359 .channel_args_preconditioning()
360 .PreconditionChannelArgs(c_args)
361 .SetObject(creds->Ref())
362 .SetObject(g_factory));
363 // Create channel.
364 auto r = grpc_core::CreateChannel(target, args);
365 if (r.ok()) {
366 channel = r->release()->c_ptr();
367 } else {
368 error = absl_status_to_grpc_error(r.status());
369 }
370 }
371 if (channel == nullptr) {
372 intptr_t integer;
373 grpc_status_code status = GRPC_STATUS_INTERNAL;
374 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
375 &integer)) {
376 status = static_cast<grpc_status_code>(integer);
377 }
378 channel = grpc_lame_client_channel_create(
379 target, status, "Failed to create secure client channel");
380 }
381 return channel;
382 }
383
384 #ifdef GPR_SUPPORT_CHANNELS_FROM_FD
grpc_channel_create_from_fd(const char * target,int fd,grpc_channel_credentials * creds,const grpc_channel_args * args)385 grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
386 grpc_channel_credentials* creds,
387 const grpc_channel_args* args) {
388 grpc_core::ExecCtx exec_ctx;
389 GRPC_API_TRACE(
390 "grpc_channel_create_from_fd(target=%p, fd=%d, creds=%p, args=%p)", 4,
391 (target, fd, creds, args));
392 // For now, we only support insecure channel credentials.
393 if (creds == nullptr ||
394 creds->type() != grpc_core::InsecureCredentials::Type()) {
395 return grpc_lame_client_channel_create(
396 target, GRPC_STATUS_INTERNAL,
397 "Failed to create client channel due to invalid creds");
398 }
399 grpc_core::ChannelArgs final_args =
400 grpc_core::CoreConfiguration::Get()
401 .channel_args_preconditioning()
402 .PreconditionChannelArgs(args)
403 .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority")
404 .SetObject(creds->Ref());
405
406 int flags = fcntl(fd, F_GETFL, 0);
407 GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
408 grpc_endpoint* client = grpc_tcp_create_from_fd(
409 grpc_fd_create(fd, "client", true),
410 grpc_event_engine::experimental::ChannelArgsEndpointConfig(final_args),
411 "fd-client");
412 grpc_transport* transport =
413 grpc_create_chttp2_transport(final_args, client, true);
414 GPR_ASSERT(transport);
415 auto channel = grpc_core::Channel::Create(
416 target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
417 if (channel.ok()) {
418 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
419 grpc_core::ExecCtx::Get()->Flush();
420 return channel->release()->c_ptr();
421 } else {
422 grpc_transport_destroy(transport);
423 return grpc_lame_client_channel_create(
424 target, static_cast<grpc_status_code>(channel.status().code()),
425 "Failed to create client channel");
426 }
427 }
428
429 #else // !GPR_SUPPORT_CHANNELS_FROM_FD
430
grpc_channel_create_from_fd(const char *,int,grpc_channel_credentials *,const grpc_channel_args *)431 grpc_channel* grpc_channel_create_from_fd(const char* /* target */,
432 int /* fd */,
433 grpc_channel_credentials* /* creds*/,
434 const grpc_channel_args* /* args */) {
435 GPR_ASSERT(0);
436 return nullptr;
437 }
438
439 #endif // GPR_SUPPORT_CHANNELS_FROM_FD
440