1 
2 //
3 //
4 // Copyright 2018 gRPC authors.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 //     http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18 //
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_CFSTREAM_CLIENT
25 
26 #include <CoreFoundation/CoreFoundation.h>
27 #include <netinet/in.h>
28 #include <string.h>
29 
30 #include <grpc/event_engine/endpoint_config.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 
35 #include "src/core/lib/address_utils/sockaddr_utils.h"
36 #include "src/core/lib/event_engine/shim.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/host_port.h"
39 #include "src/core/lib/iomgr/cfstream_handle.h"
40 #include "src/core/lib/iomgr/closure.h"
41 #include "src/core/lib/iomgr/endpoint_cfstream.h"
42 #include "src/core/lib/iomgr/error.h"
43 #include "src/core/lib/iomgr/error_cfstream.h"
44 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
45 #include "src/core/lib/iomgr/tcp_client.h"
46 #include "src/core/lib/iomgr/timer.h"
47 
48 extern grpc_core::TraceFlag grpc_tcp_trace;
49 
50 struct CFStreamConnect {
51   gpr_mu mu;
52   gpr_refcount refcount;
53 
54   CFReadStreamRef read_stream;
55   CFWriteStreamRef write_stream;
56   CFStreamHandle* stream_handle;
57 
58   grpc_timer alarm;
59   grpc_closure on_alarm;
60   grpc_closure on_open;
61 
62   bool read_stream_open;
63   bool write_stream_open;
64   bool failed;
65 
66   grpc_closure* closure;
67   grpc_endpoint** endpoint;
68   int refs;
69   std::string addr_name;
70 };
71 
CFStreamConnectCleanup(CFStreamConnect * connect)72 static void CFStreamConnectCleanup(CFStreamConnect* connect) {
73   CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up");
74   CFRelease(connect->read_stream);
75   CFRelease(connect->write_stream);
76   gpr_mu_destroy(&connect->mu);
77   delete connect;
78 }
79 
OnAlarm(void * arg,grpc_error_handle error)80 static void OnAlarm(void* arg, grpc_error_handle error) {
81   CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
82   if (grpc_tcp_trace.enabled()) {
83     gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%s", connect,
84             grpc_core::StatusToString(error).c_str());
85   }
86   gpr_mu_lock(&connect->mu);
87   grpc_closure* closure = connect->closure;
88   connect->closure = nil;
89   const bool done = (--connect->refs == 0);
90   gpr_mu_unlock(&connect->mu);
91   // Only schedule a callback once, by either OnAlarm or OnOpen. The
92   // first one issues callback while the second one does cleanup.
93   if (done) {
94     CFStreamConnectCleanup(connect);
95   } else {
96     grpc_error_handle error = GRPC_ERROR_CREATE("connect() timed out");
97     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
98   }
99 }
100 
OnOpen(void * arg,grpc_error_handle error)101 static void OnOpen(void* arg, grpc_error_handle error) {
102   CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
103   if (grpc_tcp_trace.enabled()) {
104     gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%s", connect,
105             grpc_core::StatusToString(error).c_str());
106   }
107   gpr_mu_lock(&connect->mu);
108   grpc_timer_cancel(&connect->alarm);
109   grpc_closure* closure = connect->closure;
110   connect->closure = nil;
111 
112   bool done = (--connect->refs == 0);
113   grpc_endpoint** endpoint = connect->endpoint;
114 
115   // Only schedule a callback once, by either OnAlarm or OnOpen. The
116   // first one issues callback while the second one does cleanup.
117   if (done) {
118     gpr_mu_unlock(&connect->mu);
119     CFStreamConnectCleanup(connect);
120   } else {
121     if (error.ok()) {
122       CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
123       if (stream_error == NULL) {
124         stream_error = CFWriteStreamCopyError(connect->write_stream);
125       }
126       if (stream_error) {
127         error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
128         CFRelease(stream_error);
129       }
130       if (error.ok()) {
131         *endpoint = grpc_cfstream_endpoint_create(
132             connect->read_stream, connect->write_stream,
133             connect->addr_name.c_str(), connect->stream_handle);
134       }
135     }
136     gpr_mu_unlock(&connect->mu);
137     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
138   }
139 }
140 
ParseResolvedAddress(const grpc_resolved_address * addr,CFStringRef * host,int * port)141 static void ParseResolvedAddress(const grpc_resolved_address* addr,
142                                  CFStringRef* host, int* port) {
143   std::string host_port = grpc_sockaddr_to_string(addr, true).value();
144   std::string host_string;
145   std::string port_string;
146   grpc_core::SplitHostPort(host_port, &host_string, &port_string);
147   *host = CFStringCreateWithCString(NULL, host_string.c_str(),
148                                     kCFStringEncodingUTF8);
149   *port = grpc_sockaddr_get_port(addr);
150 }
151 
CFStreamClientConnect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set *,const grpc_event_engine::experimental::EndpointConfig & config,const grpc_resolved_address * resolved_addr,grpc_core::Timestamp deadline)152 static int64_t CFStreamClientConnect(
153     grpc_closure* closure, grpc_endpoint** ep,
154     grpc_pollset_set* /*interested_parties*/,
155     const grpc_event_engine::experimental::EndpointConfig& config,
156     const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
157   if (grpc_event_engine::experimental::UseEventEngineClient()) {
158     return grpc_event_engine::experimental::event_engine_tcp_client_connect(
159         closure, ep, config, resolved_addr, deadline);
160   }
161 
162   auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
163   if (!addr_uri.ok()) {
164     grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
165     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
166     return 0;
167   }
168 
169   CFStreamConnect* connect = new CFStreamConnect();
170   connect->closure = closure;
171   connect->endpoint = ep;
172   connect->addr_name = addr_uri.value();
173   connect->refs = 2;  // One for the connect operation, one for the timer.
174   gpr_ref_init(&connect->refcount, 1);
175   gpr_mu_init(&connect->mu);
176 
177   if (grpc_tcp_trace.enabled()) {
178     gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p, %s: asynchronously connecting",
179             connect, connect->addr_name.c_str());
180   }
181 
182   CFReadStreamRef read_stream;
183   CFWriteStreamRef write_stream;
184 
185   CFStringRef host;
186   int port;
187   ParseResolvedAddress(resolved_addr, &host, &port);
188   CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
189                                      &write_stream);
190   CFRelease(host);
191   connect->read_stream = read_stream;
192   connect->write_stream = write_stream;
193   connect->stream_handle =
194       CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
195   GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
196                     grpc_schedule_on_exec_ctx);
197   connect->stream_handle->NotifyOnOpen(&connect->on_open);
198   GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
199                     grpc_schedule_on_exec_ctx);
200   gpr_mu_lock(&connect->mu);
201   CFReadStreamOpen(read_stream);
202   CFWriteStreamOpen(write_stream);
203   grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
204   gpr_mu_unlock(&connect->mu);
205   return 0;
206 }
207 
CFStreamClientCancelConnect(int64_t connection_handle)208 static bool CFStreamClientCancelConnect(int64_t connection_handle) {
209   if (grpc_event_engine::experimental::UseEventEngineClient()) {
210     return grpc_event_engine::experimental::
211         event_engine_tcp_client_cancel_connect(connection_handle);
212   }
213   return false;
214 }
215 
216 grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
217     CFStreamClientConnect, CFStreamClientCancelConnect};
218 
219 #endif  // GRPC_CFSTREAM_CLIENT
220