1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/exec_ctx.h"
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
25 
26 #include <errno.h>
27 #include <netinet/in.h>
28 #include <string.h>
29 #include <unistd.h>
30 
31 #include "absl/container/flat_hash_map.h"
32 #include "absl/strings/str_cat.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/lib/address_utils/sockaddr_utils.h"
39 #include "src/core/lib/event_engine/resolved_address_internal.h"
40 #include "src/core/lib/event_engine/shim.h"
41 #include "src/core/lib/gpr/string.h"
42 #include "src/core/lib/gprpp/crash.h"
43 #include "src/core/lib/iomgr/ev_posix.h"
44 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr_internal.h"
47 #include "src/core/lib/iomgr/sockaddr.h"
48 #include "src/core/lib/iomgr/socket_mutator.h"
49 #include "src/core/lib/iomgr/socket_utils_posix.h"
50 #include "src/core/lib/iomgr/tcp_client_posix.h"
51 #include "src/core/lib/iomgr/tcp_posix.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/iomgr/unix_sockets_posix.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 
56 extern grpc_core::TraceFlag grpc_tcp_trace;
57 
58 using ::grpc_event_engine::experimental::EndpointConfig;
59 
60 struct async_connect {
61   gpr_mu mu;
62   grpc_fd* fd;
63   grpc_timer alarm;
64   grpc_closure on_alarm;
65   int refs;
66   grpc_closure write_closure;
67   grpc_pollset_set* interested_parties;
68   std::string addr_str;
69   grpc_endpoint** ep;
70   grpc_closure* closure;
71   int64_t connection_handle;
72   bool connect_cancelled;
73   grpc_core::PosixTcpOptions options;
74 };
75 
76 struct ConnectionShard {
77   grpc_core::Mutex mu;
78   absl::flat_hash_map<int64_t, async_connect*> pending_connections
79       ABSL_GUARDED_BY(&mu);
80 };
81 
82 namespace {
83 
84 gpr_once g_tcp_client_posix_init = GPR_ONCE_INIT;
85 std::vector<ConnectionShard>* g_connection_shards = nullptr;
86 std::atomic<int64_t> g_connection_id{1};
87 
do_tcp_client_global_init(void)88 void do_tcp_client_global_init(void) {
89   size_t num_shards = std::max(2 * gpr_cpu_num_cores(), 1u);
90   g_connection_shards = new std::vector<struct ConnectionShard>(num_shards);
91 }
92 
93 }  // namespace
94 
grpc_tcp_client_global_init()95 void grpc_tcp_client_global_init() {
96   gpr_once_init(&g_tcp_client_posix_init, do_tcp_client_global_init);
97 }
98 
prepare_socket(const grpc_resolved_address * addr,int fd,const grpc_core::PosixTcpOptions & options)99 static grpc_error_handle prepare_socket(
100     const grpc_resolved_address* addr, int fd,
101     const grpc_core::PosixTcpOptions& options) {
102   grpc_error_handle err;
103 
104   GPR_ASSERT(fd >= 0);
105 
106   err = grpc_set_socket_nonblocking(fd, 1);
107   if (!err.ok()) goto error;
108   err = grpc_set_socket_cloexec(fd, 1);
109   if (!err.ok()) goto error;
110   if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
111     err = grpc_set_socket_rcvbuf(fd, options.tcp_receive_buffer_size);
112     if (!err.ok()) goto error;
113   }
114   if (!grpc_is_unix_socket(addr)) {
115     err = grpc_set_socket_low_latency(fd, 1);
116     if (!err.ok()) goto error;
117     err = grpc_set_socket_reuse_addr(fd, 1);
118     if (!err.ok()) goto error;
119     err = grpc_set_socket_tcp_user_timeout(fd, options, true /* is_client */);
120     if (!err.ok()) goto error;
121   }
122   err = grpc_set_socket_no_sigpipe_if_possible(fd);
123   if (!err.ok()) goto error;
124 
125   err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_CLIENT_CONNECTION_USAGE,
126                                           options);
127   if (!err.ok()) goto error;
128 
129   goto done;
130 
131 error:
132   if (fd >= 0) {
133     close(fd);
134   }
135 done:
136   return err;
137 }
138 
tc_on_alarm(void * acp,grpc_error_handle error)139 static void tc_on_alarm(void* acp, grpc_error_handle error) {
140   int done;
141   async_connect* ac = static_cast<async_connect*>(acp);
142   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
143     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_alarm: error=%s",
144             ac->addr_str.c_str(), grpc_core::StatusToString(error).c_str());
145   }
146   gpr_mu_lock(&ac->mu);
147   if (ac->fd != nullptr) {
148     grpc_fd_shutdown(ac->fd, GRPC_ERROR_CREATE("connect() timed out"));
149   }
150   done = (--ac->refs == 0);
151   gpr_mu_unlock(&ac->mu);
152   if (done) {
153     gpr_mu_destroy(&ac->mu);
154     delete ac;
155   }
156 }
157 
grpc_tcp_client_create_from_fd(grpc_fd * fd,const grpc_core::PosixTcpOptions & options,absl::string_view addr_str)158 static grpc_endpoint* grpc_tcp_client_create_from_fd(
159     grpc_fd* fd, const grpc_core::PosixTcpOptions& options,
160     absl::string_view addr_str) {
161   return grpc_tcp_create(fd, options, addr_str);
162 }
163 
grpc_tcp_create_from_fd(grpc_fd * fd,const grpc_event_engine::experimental::EndpointConfig & config,absl::string_view addr_str)164 grpc_endpoint* grpc_tcp_create_from_fd(
165     grpc_fd* fd, const grpc_event_engine::experimental::EndpointConfig& config,
166     absl::string_view addr_str) {
167   return grpc_tcp_create(fd, TcpOptionsFromEndpointConfig(config), addr_str);
168 }
169 
on_writable(void * acp,grpc_error_handle error)170 static void on_writable(void* acp, grpc_error_handle error) {
171   async_connect* ac = static_cast<async_connect*>(acp);
172   int so_error = 0;
173   socklen_t so_error_size;
174   int err;
175   int done;
176   grpc_endpoint** ep = ac->ep;
177   grpc_closure* closure = ac->closure;
178   std::string addr_str = ac->addr_str;
179   grpc_fd* fd;
180 
181   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
182     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: on_writable: error=%s",
183             ac->addr_str.c_str(), grpc_core::StatusToString(error).c_str());
184   }
185 
186   gpr_mu_lock(&ac->mu);
187   GPR_ASSERT(ac->fd);
188   fd = ac->fd;
189   ac->fd = nullptr;
190   bool connect_cancelled = ac->connect_cancelled;
191   gpr_mu_unlock(&ac->mu);
192 
193   grpc_timer_cancel(&ac->alarm);
194 
195   gpr_mu_lock(&ac->mu);
196   if (!error.ok()) {
197     error = grpc_error_set_str(error, grpc_core::StatusStrProperty::kOsError,
198                                "Timeout occurred");
199     goto finish;
200   }
201 
202   if (connect_cancelled) {
203     // The callback should not get scheduled in this case.
204     error = absl::OkStatus();
205     goto finish;
206   }
207 
208   do {
209     so_error_size = sizeof(so_error);
210     err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
211                      &so_error_size);
212   } while (err < 0 && errno == EINTR);
213   if (err < 0) {
214     error = GRPC_OS_ERROR(errno, "getsockopt");
215     goto finish;
216   }
217 
218   switch (so_error) {
219     case 0:
220       grpc_pollset_set_del_fd(ac->interested_parties, fd);
221       *ep = grpc_tcp_client_create_from_fd(fd, ac->options, ac->addr_str);
222       fd = nullptr;
223       break;
224     case ENOBUFS:
225       // We will get one of these errors if we have run out of
226       // memory in the kernel for the data structures allocated
227       // when you connect a socket.  If this happens it is very
228       // likely that if we wait a little bit then try again the
229       // connection will work (since other programs or this
230       // program will close their network connections and free up
231       // memory).  This does _not_ indicate that there is anything
232       // wrong with the server we are connecting to, this is a
233       // local problem.
234 
235       // If you are looking at this code, then chances are that
236       // your program or another program on the same computer
237       // opened too many network connections.  The "easy" fix:
238       // don't do that!
239       gpr_log(GPR_ERROR, "kernel out of buffers");
240       gpr_mu_unlock(&ac->mu);
241       grpc_fd_notify_on_write(fd, &ac->write_closure);
242       return;
243     case ECONNREFUSED:
244       // This error shouldn't happen for anything other than connect().
245       error = GRPC_OS_ERROR(so_error, "connect");
246       break;
247     default:
248       // We don't really know which syscall triggered the problem here,
249       // so punt by reporting getsockopt().
250       error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
251       break;
252   }
253 
254 finish:
255   if (!connect_cancelled) {
256     int shard_number = ac->connection_handle % (*g_connection_shards).size();
257     struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
258     {
259       grpc_core::MutexLock lock(&shard->mu);
260       shard->pending_connections.erase(ac->connection_handle);
261     }
262   }
263   if (fd != nullptr) {
264     grpc_pollset_set_del_fd(ac->interested_parties, fd);
265     grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
266     fd = nullptr;
267   }
268   done = (--ac->refs == 0);
269   gpr_mu_unlock(&ac->mu);
270   if (!error.ok()) {
271     std::string str;
272     bool ret = grpc_error_get_str(
273         error, grpc_core::StatusStrProperty::kDescription, &str);
274     GPR_ASSERT(ret);
275     std::string description =
276         absl::StrCat("Failed to connect to remote host: ", str);
277     error = grpc_error_set_str(
278         error, grpc_core::StatusStrProperty::kDescription, description);
279     error = grpc_error_set_str(
280         error, grpc_core::StatusStrProperty::kTargetAddress, addr_str);
281   }
282   if (done) {
283     // This is safe even outside the lock, because "done", the sentinel, is
284     // populated *inside* the lock.
285     gpr_mu_destroy(&ac->mu);
286     delete ac;
287   }
288   // Push async connect closure to the executor since this may actually be
289   // called during the shutdown process, in which case a deadlock could form
290   // between the core shutdown mu and the connector mu (b/188239051)
291   if (!connect_cancelled) {
292     grpc_core::Executor::Run(closure, error);
293   }
294 }
295 
grpc_tcp_client_prepare_fd(const grpc_core::PosixTcpOptions & options,const grpc_resolved_address * addr,grpc_resolved_address * mapped_addr,int * fd)296 grpc_error_handle grpc_tcp_client_prepare_fd(
297     const grpc_core::PosixTcpOptions& options,
298     const grpc_resolved_address* addr, grpc_resolved_address* mapped_addr,
299     int* fd) {
300   grpc_dualstack_mode dsmode;
301   grpc_error_handle error;
302   *fd = -1;
303   // Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
304   // v6.
305   if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
306     // addr is v4 mapped to v6 or v6.
307     memcpy(mapped_addr, addr, sizeof(*mapped_addr));
308   }
309   error =
310       grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, fd);
311   if (!error.ok()) {
312     return error;
313   }
314   if (dsmode == GRPC_DSMODE_IPV4) {
315     // Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
316     if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
317       memcpy(mapped_addr, addr, sizeof(*mapped_addr));
318     }
319   }
320   if ((error = prepare_socket(mapped_addr, *fd, options)) != absl::OkStatus()) {
321     return error;
322   }
323   return absl::OkStatus();
324 }
325 
grpc_tcp_client_create_from_prepared_fd(grpc_pollset_set * interested_parties,grpc_closure * closure,const int fd,const grpc_core::PosixTcpOptions & options,const grpc_resolved_address * addr,grpc_core::Timestamp deadline,grpc_endpoint ** ep)326 int64_t grpc_tcp_client_create_from_prepared_fd(
327     grpc_pollset_set* interested_parties, grpc_closure* closure, const int fd,
328     const grpc_core::PosixTcpOptions& options,
329     const grpc_resolved_address* addr, grpc_core::Timestamp deadline,
330     grpc_endpoint** ep) {
331   int err;
332   do {
333     err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
334                   addr->len);
335   } while (err < 0 && errno == EINTR);
336 
337   auto addr_uri = grpc_sockaddr_to_uri(addr);
338   if (!addr_uri.ok()) {
339     grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
340     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
341     return 0;
342   }
343 
344   std::string name = absl::StrCat("tcp-client:", addr_uri.value());
345   grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);
346   int64_t connection_id = 0;
347   if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
348     // Connection is still in progress.
349     connection_id = g_connection_id.fetch_add(1, std::memory_order_acq_rel);
350   }
351 
352   if (err >= 0) {
353     // Connection already succeded. Return 0 to discourage any cancellation
354     // attempts.
355     *ep = grpc_tcp_client_create_from_fd(fdobj, options, addr_uri.value());
356     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
357     return 0;
358   }
359   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
360     // Connection already failed. Return 0 to discourage any cancellation
361     // attempts.
362     grpc_error_handle error = GRPC_OS_ERROR(errno, "connect");
363     error = grpc_error_set_str(
364         error, grpc_core::StatusStrProperty::kTargetAddress, addr_uri.value());
365     grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
366     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
367     return 0;
368   }
369 
370   grpc_pollset_set_add_fd(interested_parties, fdobj);
371 
372   async_connect* ac = new async_connect();
373   ac->closure = closure;
374   ac->ep = ep;
375   ac->fd = fdobj;
376   ac->interested_parties = interested_parties;
377   ac->addr_str = addr_uri.value();
378   ac->connection_handle = connection_id;
379   ac->connect_cancelled = false;
380   gpr_mu_init(&ac->mu);
381   ac->refs = 2;
382   GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
383                     grpc_schedule_on_exec_ctx);
384   ac->options = options;
385 
386   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
387     gpr_log(GPR_INFO, "CLIENT_CONNECT: %s: asynchronously connecting fd %p",
388             ac->addr_str.c_str(), fdobj);
389   }
390 
391   int shard_number = connection_id % (*g_connection_shards).size();
392   struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
393   {
394     grpc_core::MutexLock lock(&shard->mu);
395     shard->pending_connections.insert_or_assign(connection_id, ac);
396   }
397 
398   gpr_mu_lock(&ac->mu);
399   GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
400   grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
401   grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
402   gpr_mu_unlock(&ac->mu);
403   return connection_id;
404 }
405 
tcp_connect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set * interested_parties,const EndpointConfig & config,const grpc_resolved_address * addr,grpc_core::Timestamp deadline)406 static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
407                            grpc_pollset_set* interested_parties,
408                            const EndpointConfig& config,
409                            const grpc_resolved_address* addr,
410                            grpc_core::Timestamp deadline) {
411   if (grpc_event_engine::experimental::UseEventEngineClient()) {
412     return grpc_event_engine::experimental::event_engine_tcp_client_connect(
413         closure, ep, config, addr, deadline);
414   }
415   grpc_resolved_address mapped_addr;
416   grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config));
417   int fd = -1;
418   grpc_error_handle error;
419   *ep = nullptr;
420   if ((error = grpc_tcp_client_prepare_fd(options, addr, &mapped_addr, &fd)) !=
421       absl::OkStatus()) {
422     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
423     return 0;
424   }
425   return grpc_tcp_client_create_from_prepared_fd(
426       interested_parties, closure, fd, options, &mapped_addr, deadline, ep);
427 }
428 
tcp_cancel_connect(int64_t connection_handle)429 static bool tcp_cancel_connect(int64_t connection_handle) {
430   if (grpc_event_engine::experimental::UseEventEngineClient()) {
431     return grpc_event_engine::experimental::
432         event_engine_tcp_client_cancel_connect(connection_handle);
433   }
434   if (connection_handle <= 0) {
435     return false;
436   }
437   int shard_number = connection_handle % (*g_connection_shards).size();
438   struct ConnectionShard* shard = &(*g_connection_shards)[shard_number];
439   async_connect* ac = nullptr;
440   {
441     grpc_core::MutexLock lock(&shard->mu);
442     auto it = shard->pending_connections.find(connection_handle);
443     if (it != shard->pending_connections.end()) {
444       ac = it->second;
445       GPR_ASSERT(ac != nullptr);
446       // Trying to acquire ac->mu here would could cause a deadlock because
447       // the on_writable method tries to acquire the two mutexes used
448       // here in the reverse order. But we dont need to acquire ac->mu before
449       // incrementing ac->refs here. This is because the on_writable
450       // method decrements ac->refs only after deleting the connection handle
451       // from the corresponding hashmap. If the code enters here, it means that
452       // deletion hasn't happened yet. The deletion can only happen after the
453       // corresponding g_shard_mu is unlocked.
454       ++ac->refs;
455       // Remove connection from list of active connections.
456       shard->pending_connections.erase(it);
457     }
458   }
459   if (ac == nullptr) {
460     return false;
461   }
462   gpr_mu_lock(&ac->mu);
463   bool connection_cancel_success = (ac->fd != nullptr);
464   if (connection_cancel_success) {
465     // Connection is still pending. The on_writable callback hasn't executed
466     // yet because ac->fd != nullptr.
467     ac->connect_cancelled = true;
468     // Shutdown the fd. This would cause on_writable to run as soon as possible.
469     // We dont need to pass a custom error here because it wont be used since
470     // the on_connect_closure is not run if connect cancellation is successfull.
471     grpc_fd_shutdown(ac->fd, absl::OkStatus());
472   }
473   bool done = (--ac->refs == 0);
474   gpr_mu_unlock(&ac->mu);
475   if (done) {
476     // This is safe even outside the lock, because "done", the sentinel, is
477     // populated *inside* the lock.
478     gpr_mu_destroy(&ac->mu);
479     delete ac;
480   }
481   return connection_cancel_success;
482 }
483 
484 grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect,
485                                                        tcp_cancel_connect};
486 #endif
487