1
2 //
3 //
4 // Copyright 2018 gRPC authors.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 // http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18 //
19
20 #include <grpc/support/port_platform.h>
21
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_CFSTREAM_CLIENT
25
26 #include <CoreFoundation/CoreFoundation.h>
27 #include <netinet/in.h>
28 #include <string.h>
29
30 #include <grpc/event_engine/endpoint_config.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34
35 #include "src/core/lib/address_utils/sockaddr_utils.h"
36 #include "src/core/lib/event_engine/shim.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/host_port.h"
39 #include "src/core/lib/iomgr/cfstream_handle.h"
40 #include "src/core/lib/iomgr/closure.h"
41 #include "src/core/lib/iomgr/endpoint_cfstream.h"
42 #include "src/core/lib/iomgr/error.h"
43 #include "src/core/lib/iomgr/error_cfstream.h"
44 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
45 #include "src/core/lib/iomgr/tcp_client.h"
46 #include "src/core/lib/iomgr/timer.h"
47
48 extern grpc_core::TraceFlag grpc_tcp_trace;
49
50 struct CFStreamConnect {
51 gpr_mu mu;
52 gpr_refcount refcount;
53
54 CFReadStreamRef read_stream;
55 CFWriteStreamRef write_stream;
56 CFStreamHandle* stream_handle;
57
58 grpc_timer alarm;
59 grpc_closure on_alarm;
60 grpc_closure on_open;
61
62 bool read_stream_open;
63 bool write_stream_open;
64 bool failed;
65
66 grpc_closure* closure;
67 grpc_endpoint** endpoint;
68 int refs;
69 std::string addr_name;
70 };
71
CFStreamConnectCleanup(CFStreamConnect * connect)72 static void CFStreamConnectCleanup(CFStreamConnect* connect) {
73 CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up");
74 CFRelease(connect->read_stream);
75 CFRelease(connect->write_stream);
76 gpr_mu_destroy(&connect->mu);
77 delete connect;
78 }
79
OnAlarm(void * arg,grpc_error_handle error)80 static void OnAlarm(void* arg, grpc_error_handle error) {
81 CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
82 if (grpc_tcp_trace.enabled()) {
83 gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%s", connect,
84 grpc_core::StatusToString(error).c_str());
85 }
86 gpr_mu_lock(&connect->mu);
87 grpc_closure* closure = connect->closure;
88 connect->closure = nil;
89 const bool done = (--connect->refs == 0);
90 gpr_mu_unlock(&connect->mu);
91 // Only schedule a callback once, by either OnAlarm or OnOpen. The
92 // first one issues callback while the second one does cleanup.
93 if (done) {
94 CFStreamConnectCleanup(connect);
95 } else {
96 grpc_error_handle error = GRPC_ERROR_CREATE("connect() timed out");
97 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
98 }
99 }
100
OnOpen(void * arg,grpc_error_handle error)101 static void OnOpen(void* arg, grpc_error_handle error) {
102 CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
103 if (grpc_tcp_trace.enabled()) {
104 gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%s", connect,
105 grpc_core::StatusToString(error).c_str());
106 }
107 gpr_mu_lock(&connect->mu);
108 grpc_timer_cancel(&connect->alarm);
109 grpc_closure* closure = connect->closure;
110 connect->closure = nil;
111
112 bool done = (--connect->refs == 0);
113 grpc_endpoint** endpoint = connect->endpoint;
114
115 // Only schedule a callback once, by either OnAlarm or OnOpen. The
116 // first one issues callback while the second one does cleanup.
117 if (done) {
118 gpr_mu_unlock(&connect->mu);
119 CFStreamConnectCleanup(connect);
120 } else {
121 if (error.ok()) {
122 CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
123 if (stream_error == NULL) {
124 stream_error = CFWriteStreamCopyError(connect->write_stream);
125 }
126 if (stream_error) {
127 error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
128 CFRelease(stream_error);
129 }
130 if (error.ok()) {
131 *endpoint = grpc_cfstream_endpoint_create(
132 connect->read_stream, connect->write_stream,
133 connect->addr_name.c_str(), connect->stream_handle);
134 }
135 }
136 gpr_mu_unlock(&connect->mu);
137 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
138 }
139 }
140
ParseResolvedAddress(const grpc_resolved_address * addr,CFStringRef * host,int * port)141 static void ParseResolvedAddress(const grpc_resolved_address* addr,
142 CFStringRef* host, int* port) {
143 std::string host_port = grpc_sockaddr_to_string(addr, true).value();
144 std::string host_string;
145 std::string port_string;
146 grpc_core::SplitHostPort(host_port, &host_string, &port_string);
147 *host = CFStringCreateWithCString(NULL, host_string.c_str(),
148 kCFStringEncodingUTF8);
149 *port = grpc_sockaddr_get_port(addr);
150 }
151
CFStreamClientConnect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set *,const grpc_event_engine::experimental::EndpointConfig & config,const grpc_resolved_address * resolved_addr,grpc_core::Timestamp deadline)152 static int64_t CFStreamClientConnect(
153 grpc_closure* closure, grpc_endpoint** ep,
154 grpc_pollset_set* /*interested_parties*/,
155 const grpc_event_engine::experimental::EndpointConfig& config,
156 const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
157 if (grpc_event_engine::experimental::UseEventEngineClient()) {
158 return grpc_event_engine::experimental::event_engine_tcp_client_connect(
159 closure, ep, config, resolved_addr, deadline);
160 }
161
162 auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
163 if (!addr_uri.ok()) {
164 grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
165 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
166 return 0;
167 }
168
169 CFStreamConnect* connect = new CFStreamConnect();
170 connect->closure = closure;
171 connect->endpoint = ep;
172 connect->addr_name = addr_uri.value();
173 connect->refs = 2; // One for the connect operation, one for the timer.
174 gpr_ref_init(&connect->refcount, 1);
175 gpr_mu_init(&connect->mu);
176
177 if (grpc_tcp_trace.enabled()) {
178 gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p, %s: asynchronously connecting",
179 connect, connect->addr_name.c_str());
180 }
181
182 CFReadStreamRef read_stream;
183 CFWriteStreamRef write_stream;
184
185 CFStringRef host;
186 int port;
187 ParseResolvedAddress(resolved_addr, &host, &port);
188 CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
189 &write_stream);
190 CFRelease(host);
191 connect->read_stream = read_stream;
192 connect->write_stream = write_stream;
193 connect->stream_handle =
194 CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
195 GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
196 grpc_schedule_on_exec_ctx);
197 connect->stream_handle->NotifyOnOpen(&connect->on_open);
198 GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
199 grpc_schedule_on_exec_ctx);
200 gpr_mu_lock(&connect->mu);
201 CFReadStreamOpen(read_stream);
202 CFWriteStreamOpen(write_stream);
203 grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
204 gpr_mu_unlock(&connect->mu);
205 return 0;
206 }
207
CFStreamClientCancelConnect(int64_t connection_handle)208 static bool CFStreamClientCancelConnect(int64_t connection_handle) {
209 if (grpc_event_engine::experimental::UseEventEngineClient()) {
210 return grpc_event_engine::experimental::
211 event_engine_tcp_client_cancel_connect(connection_handle);
212 }
213 return false;
214 }
215
216 grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
217 CFStreamClientConnect, CFStreamClientCancelConnect};
218
219 #endif // GRPC_CFSTREAM_CLIENT
220