1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/sync.h>
30 
31 #include "src/core/lib/debug/trace.h"
32 #import "src/core/lib/iomgr/cfstream_handle.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/ev_apple.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 
38 extern grpc_core::TraceFlag grpc_tcp_trace;
39 
GrpcLibraryInitHolder()40 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
41 
~GrpcLibraryInitHolder()42 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
43 
Retain(void * info)44 void* CFStreamHandle::Retain(void* info) {
45   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
46   CFSTREAM_HANDLE_REF(handle, "retain");
47   return info;
48 }
49 
Release(void * info)50 void CFStreamHandle::Release(void* info) {
51   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
52   CFSTREAM_HANDLE_UNREF(handle, "release");
53 }
54 
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)55 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
56     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
57   return new CFStreamHandle(read_stream, write_stream);
58 }
59 
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)60 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
61                                   CFStreamEventType type,
62                                   void* client_callback_info) {
63   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
64   grpc_core::ExecCtx exec_ctx;
65   grpc_error_handle error;
66   CFErrorRef stream_error;
67   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
68   if (grpc_tcp_trace.enabled()) {
69     gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
70             stream, type, client_callback_info);
71   }
72   switch (type) {
73     case kCFStreamEventOpenCompleted:
74       handle->open_event_.SetReady();
75       break;
76     case kCFStreamEventHasBytesAvailable:
77     case kCFStreamEventEndEncountered:
78       handle->read_event_.SetReady();
79       break;
80     case kCFStreamEventErrorOccurred:
81       stream_error = CFReadStreamCopyError(stream);
82       error = grpc_error_set_int(
83           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
84           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
85       CFRelease(stream_error);
86       handle->open_event_.SetShutdown(error);
87       handle->write_event_.SetShutdown(error);
88       handle->read_event_.SetShutdown(error);
89       break;
90     default:
91       GPR_UNREACHABLE_CODE(return);
92   }
93 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)94 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
95                                    CFStreamEventType type,
96                                    void* clientCallBackInfo) {
97   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
98   grpc_core::ExecCtx exec_ctx;
99   grpc_error_handle error;
100   CFErrorRef stream_error;
101   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
102   if (grpc_tcp_trace.enabled()) {
103     gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
104             stream, type, clientCallBackInfo);
105   }
106   switch (type) {
107     case kCFStreamEventOpenCompleted:
108       handle->open_event_.SetReady();
109       break;
110     case kCFStreamEventCanAcceptBytes:
111     case kCFStreamEventEndEncountered:
112       handle->write_event_.SetReady();
113       break;
114     case kCFStreamEventErrorOccurred:
115       stream_error = CFWriteStreamCopyError(stream);
116       error = grpc_error_set_int(
117           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
118           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
119       CFRelease(stream_error);
120       handle->open_event_.SetShutdown(error);
121       handle->write_event_.SetShutdown(error);
122       handle->read_event_.SetShutdown(error);
123       break;
124     default:
125       GPR_UNREACHABLE_CODE(return);
126   }
127 }
128 
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)129 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
130                                CFWriteStreamRef write_stream) {
131   gpr_ref_init(&refcount_, 1);
132   open_event_.InitEvent();
133   read_event_.InitEvent();
134   write_event_.InitEvent();
135   dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
136   CFStreamClientContext ctx = {0, static_cast<void*>(this),
137                                CFStreamHandle::Retain, CFStreamHandle::Release,
138                                nil};
139   CFReadStreamSetClient(
140       read_stream,
141       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
142           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
143       CFStreamHandle::ReadCallback, &ctx);
144   CFWriteStreamSetClient(
145       write_stream,
146       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
147           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
148       CFStreamHandle::WriteCallback, &ctx);
149   grpc_apple_register_read_stream(read_stream, dispatch_queue_);
150   grpc_apple_register_write_stream(write_stream, dispatch_queue_);
151 }
152 
~CFStreamHandle()153 CFStreamHandle::~CFStreamHandle() {
154   open_event_.DestroyEvent();
155   read_event_.DestroyEvent();
156   write_event_.DestroyEvent();
157   dispatch_release(dispatch_queue_);
158 }
159 
NotifyOnOpen(grpc_closure * closure)160 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
161   open_event_.NotifyOn(closure);
162 }
163 
NotifyOnRead(grpc_closure * closure)164 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
165   read_event_.NotifyOn(closure);
166 }
167 
NotifyOnWrite(grpc_closure * closure)168 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
169   write_event_.NotifyOn(closure);
170 }
171 
Shutdown(grpc_error_handle error)172 void CFStreamHandle::Shutdown(grpc_error_handle error) {
173   open_event_.SetShutdown(error);
174   read_event_.SetShutdown(error);
175   write_event_.SetShutdown(error);
176 }
177 
Ref(const char * file,int line,const char * reason)178 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
179   if (grpc_tcp_trace.enabled()) {
180     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
181     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
182             "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
183             reason, val, val + 1);
184   }
185   gpr_ref(&refcount_);
186 }
187 
Unref(const char * file,int line,const char * reason)188 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
189   if (grpc_tcp_trace.enabled()) {
190     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
191     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
192             "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
193             reason, val, val - 1);
194   }
195   if (gpr_unref(&refcount_)) {
196     delete this;
197   }
198 }
199 
200 #else
201 
202 // Creating a phony function so that the grpc_cfstream library will be
203 // non-empty.
204 //
CFStreamPhony()205 void CFStreamPhony() {}
206 
207 #endif
208