1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/sync.h>
30
31 #include "src/core/lib/debug/trace.h"
32 #import "src/core/lib/iomgr/cfstream_handle.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/ev_apple.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37
38 extern grpc_core::TraceFlag grpc_tcp_trace;
39
GrpcLibraryInitHolder()40 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
41
~GrpcLibraryInitHolder()42 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
43
Retain(void * info)44 void* CFStreamHandle::Retain(void* info) {
45 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
46 CFSTREAM_HANDLE_REF(handle, "retain");
47 return info;
48 }
49
Release(void * info)50 void CFStreamHandle::Release(void* info) {
51 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
52 CFSTREAM_HANDLE_UNREF(handle, "release");
53 }
54
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)55 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
56 CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
57 return new CFStreamHandle(read_stream, write_stream);
58 }
59
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)60 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
61 CFStreamEventType type,
62 void* client_callback_info) {
63 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
64 grpc_core::ExecCtx exec_ctx;
65 grpc_error_handle error;
66 CFErrorRef stream_error;
67 CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
68 if (grpc_tcp_trace.enabled()) {
69 gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
70 stream, type, client_callback_info);
71 }
72 switch (type) {
73 case kCFStreamEventOpenCompleted:
74 handle->open_event_.SetReady();
75 break;
76 case kCFStreamEventHasBytesAvailable:
77 case kCFStreamEventEndEncountered:
78 handle->read_event_.SetReady();
79 break;
80 case kCFStreamEventErrorOccurred:
81 stream_error = CFReadStreamCopyError(stream);
82 error = grpc_error_set_int(
83 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
84 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
85 CFRelease(stream_error);
86 handle->open_event_.SetShutdown(error);
87 handle->write_event_.SetShutdown(error);
88 handle->read_event_.SetShutdown(error);
89 break;
90 default:
91 GPR_UNREACHABLE_CODE(return);
92 }
93 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)94 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
95 CFStreamEventType type,
96 void* clientCallBackInfo) {
97 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
98 grpc_core::ExecCtx exec_ctx;
99 grpc_error_handle error;
100 CFErrorRef stream_error;
101 CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
102 if (grpc_tcp_trace.enabled()) {
103 gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
104 stream, type, clientCallBackInfo);
105 }
106 switch (type) {
107 case kCFStreamEventOpenCompleted:
108 handle->open_event_.SetReady();
109 break;
110 case kCFStreamEventCanAcceptBytes:
111 case kCFStreamEventEndEncountered:
112 handle->write_event_.SetReady();
113 break;
114 case kCFStreamEventErrorOccurred:
115 stream_error = CFWriteStreamCopyError(stream);
116 error = grpc_error_set_int(
117 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
118 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
119 CFRelease(stream_error);
120 handle->open_event_.SetShutdown(error);
121 handle->write_event_.SetShutdown(error);
122 handle->read_event_.SetShutdown(error);
123 break;
124 default:
125 GPR_UNREACHABLE_CODE(return);
126 }
127 }
128
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)129 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
130 CFWriteStreamRef write_stream) {
131 gpr_ref_init(&refcount_, 1);
132 open_event_.InitEvent();
133 read_event_.InitEvent();
134 write_event_.InitEvent();
135 dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
136 CFStreamClientContext ctx = {0, static_cast<void*>(this),
137 CFStreamHandle::Retain, CFStreamHandle::Release,
138 nil};
139 CFReadStreamSetClient(
140 read_stream,
141 kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
142 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
143 CFStreamHandle::ReadCallback, &ctx);
144 CFWriteStreamSetClient(
145 write_stream,
146 kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
147 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
148 CFStreamHandle::WriteCallback, &ctx);
149 grpc_apple_register_read_stream(read_stream, dispatch_queue_);
150 grpc_apple_register_write_stream(write_stream, dispatch_queue_);
151 }
152
~CFStreamHandle()153 CFStreamHandle::~CFStreamHandle() {
154 open_event_.DestroyEvent();
155 read_event_.DestroyEvent();
156 write_event_.DestroyEvent();
157 dispatch_release(dispatch_queue_);
158 }
159
NotifyOnOpen(grpc_closure * closure)160 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
161 open_event_.NotifyOn(closure);
162 }
163
NotifyOnRead(grpc_closure * closure)164 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
165 read_event_.NotifyOn(closure);
166 }
167
NotifyOnWrite(grpc_closure * closure)168 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
169 write_event_.NotifyOn(closure);
170 }
171
Shutdown(grpc_error_handle error)172 void CFStreamHandle::Shutdown(grpc_error_handle error) {
173 open_event_.SetShutdown(error);
174 read_event_.SetShutdown(error);
175 write_event_.SetShutdown(error);
176 }
177
Ref(const char * file,int line,const char * reason)178 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
179 if (grpc_tcp_trace.enabled()) {
180 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
181 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
182 "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
183 reason, val, val + 1);
184 }
185 gpr_ref(&refcount_);
186 }
187
Unref(const char * file,int line,const char * reason)188 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
189 if (grpc_tcp_trace.enabled()) {
190 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
191 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
192 "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
193 reason, val, val - 1);
194 }
195 if (gpr_unref(&refcount_)) {
196 delete this;
197 }
198 }
199
200 #else
201
202 // Creating a phony function so that the grpc_cfstream library will be
203 // non-empty.
204 //
CFStreamPhony()205 void CFStreamPhony() {}
206
207 #endif
208