1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "ProtoRPC.h" 20 21#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS 22#import <Protobuf/GPBProtocolBuffers.h> 23#else 24#import <GPBProtocolBuffers.h> 25#endif 26#import <GRPCClient/GRPCCall.h> 27#import <RxLibrary/GRXWriteable.h> 28#import <RxLibrary/GRXWriter+Transformations.h> 29 30@implementation GRPCUnaryResponseHandler { 31 void (^_responseHandler)(id, NSError *); 32 dispatch_queue_t _responseDispatchQueue; 33 34 GPBMessage *_message; 35} 36 37- (nullable instancetype)initWithResponseHandler:(void (^)(id, NSError *))handler 38 responseDispatchQueue:(dispatch_queue_t)dispatchQueue { 39 if ((self = [super init])) { 40 _responseHandler = handler; 41 if (dispatchQueue == nil) { 42 _responseDispatchQueue = dispatch_get_main_queue(); 43 } else { 44 _responseDispatchQueue = dispatchQueue; 45 } 46 } 47 return self; 48} 49 50// Implements GRPCProtoResponseHandler 51- (dispatch_queue_t)dispatchQueue { 52 return _responseDispatchQueue; 53} 54 55- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { 56 _responseHeaders = [initialMetadata copy]; 57} 58 59- (void)didReceiveProtoMessage:(GPBMessage *)message { 60 _message = message; 61} 62 63- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 64 _responseTrailers = [trailingMetadata copy]; 65 GPBMessage *message = _message; 66 _message = nil; 67 _responseHandler(message, error); 68} 69 70// Intentional no-op since flow control is N/A in a unary call 71- (void)didWriteMessage { 72} 73 74@end 75 76@implementation GRPCUnaryProtoCall { 77 GRPCStreamingProtoCall *_call; 78 GPBMessage *_message; 79} 80 81- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions 82 message:(GPBMessage *)message 83 responseHandler:(id<GRPCProtoResponseHandler>)handler 84 callOptions:(GRPCCallOptions *)callOptions 85 responseClass:(Class)responseClass { 86 NSAssert(message != nil, @"message cannot be empty."); 87 NSAssert(responseClass != nil, @"responseClass cannot be empty."); 88 if (message == nil || responseClass == nil) { 89 return nil; 90 } 91 if ((self = [super init])) { 92 _call = [[GRPCStreamingProtoCall alloc] initWithRequestOptions:requestOptions 93 responseHandler:handler 94 callOptions:callOptions 95 responseClass:responseClass]; 96 _message = [message copy]; 97 } 98 return self; 99} 100 101- (void)start { 102 [_call start]; 103 [_call receiveNextMessage]; 104 [_call writeMessage:_message]; 105 [_call finish]; 106} 107 108- (void)cancel { 109 [_call cancel]; 110} 111 112@end 113 114@interface GRPCStreamingProtoCall () <GRPCResponseHandler> 115 116@end 117 118@implementation GRPCStreamingProtoCall { 119 GRPCRequestOptions *_requestOptions; 120 id<GRPCProtoResponseHandler> _handler; 121 GRPCCallOptions *_callOptions; 122 Class _responseClass; 123 124 GRPCCall2 *_call; 125 dispatch_queue_t _dispatchQueue; 126} 127 128- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions 129 responseHandler:(id<GRPCProtoResponseHandler>)handler 130 callOptions:(GRPCCallOptions *)callOptions 131 responseClass:(Class)responseClass { 132 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0, 133 @"Invalid callOptions."); 134 NSAssert(handler != nil, @"handler cannot be empty."); 135 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) { 136 return nil; 137 } 138 if (handler == nil) { 139 return nil; 140 } 141 142 if ((self = [super init])) { 143 _requestOptions = [requestOptions copy]; 144 _handler = handler; 145 _callOptions = [callOptions copy]; 146 _responseClass = responseClass; 147 148 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above 149#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300 150 if (@available(iOS 8.0, macOS 10.10, *)) { 151 _dispatchQueue = dispatch_queue_create( 152 NULL, 153 dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); 154 } else { 155#else 156 { 157#endif 158 _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 159 } 160 dispatch_set_target_queue(_dispatchQueue, handler.dispatchQueue); 161 162 _call = [[GRPCCall2 alloc] initWithRequestOptions:_requestOptions 163 responseHandler:self 164 callOptions:_callOptions]; 165 } 166 return self; 167} 168 169- (void)start { 170 GRPCCall2 *copiedCall; 171 @synchronized(self) { 172 copiedCall = _call; 173 } 174 [copiedCall start]; 175} 176 177- (void)cancel { 178 GRPCCall2 *copiedCall; 179 @synchronized(self) { 180 copiedCall = _call; 181 _call = nil; 182 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { 183 dispatch_async(_dispatchQueue, ^{ 184 id<GRPCProtoResponseHandler> copiedHandler = nil; 185 @synchronized(self) { 186 copiedHandler = self->_handler; 187 self->_handler = nil; 188 } 189 [copiedHandler didCloseWithTrailingMetadata:nil 190 error:[NSError errorWithDomain:kGRPCErrorDomain 191 code:GRPCErrorCodeCancelled 192 userInfo:@{ 193 NSLocalizedDescriptionKey : 194 @"Canceled by app" 195 }]]; 196 }); 197 } else { 198 _handler = nil; 199 } 200 } 201 [copiedCall cancel]; 202} 203 204- (void)writeMessage:(GPBMessage *)message { 205 NSAssert([message isKindOfClass:[GPBMessage class]], @"Parameter message must be a GPBMessage"); 206 if (![message isKindOfClass:[GPBMessage class]]) { 207 NSLog(@"Failed to send a message that is non-proto."); 208 return; 209 } 210 211 GRPCCall2 *copiedCall; 212 @synchronized(self) { 213 copiedCall = _call; 214 } 215 [copiedCall writeData:[message data]]; 216} 217 218- (void)finish { 219 GRPCCall2 *copiedCall; 220 @synchronized(self) { 221 copiedCall = _call; 222 _call = nil; 223 } 224 [copiedCall finish]; 225} 226 227- (void)receiveNextMessage { 228 [self receiveNextMessages:1]; 229} 230- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 231 GRPCCall2 *copiedCall; 232 @synchronized(self) { 233 copiedCall = _call; 234 } 235 [copiedCall receiveNextMessages:numberOfMessages]; 236} 237 238- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { 239 @synchronized(self) { 240 if (initialMetadata != nil && 241 [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) { 242 dispatch_async(_dispatchQueue, ^{ 243 id<GRPCProtoResponseHandler> copiedHandler = nil; 244 @synchronized(self) { 245 copiedHandler = self->_handler; 246 } 247 [copiedHandler didReceiveInitialMetadata:initialMetadata]; 248 }); 249 } 250 } 251} 252 253- (void)didReceiveData:(id)data { 254 if (data == nil) return; 255 256 NSError *error = nil; 257 GPBMessage *parsed = [_responseClass parseFromData:data error:&error]; 258 @synchronized(self) { 259 if (parsed && [_handler respondsToSelector:@selector(didReceiveProtoMessage:)]) { 260 dispatch_async(_dispatchQueue, ^{ 261 id<GRPCProtoResponseHandler> copiedHandler = nil; 262 @synchronized(self) { 263 copiedHandler = self->_handler; 264 } 265 [copiedHandler didReceiveProtoMessage:parsed]; 266 }); 267 } else if (!parsed && [_handler respondsToSelector:@selector(didCloseWithTrailingMetadata: 268 error:)]) { 269 dispatch_async(_dispatchQueue, ^{ 270 id<GRPCProtoResponseHandler> copiedHandler = nil; 271 @synchronized(self) { 272 copiedHandler = self->_handler; 273 self->_handler = nil; 274 } 275 [copiedHandler 276 didCloseWithTrailingMetadata:nil 277 error:ErrorForBadProto(data, self->_responseClass, error)]; 278 }); 279 [_call cancel]; 280 _call = nil; 281 } 282 } 283} 284 285- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 286 @synchronized(self) { 287 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { 288 dispatch_async(_dispatchQueue, ^{ 289 id<GRPCProtoResponseHandler> copiedHandler = nil; 290 @synchronized(self) { 291 copiedHandler = self->_handler; 292 self->_handler = nil; 293 } 294 [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error]; 295 }); 296 } 297 _call = nil; 298 } 299} 300 301- (void)didWriteData { 302 @synchronized(self) { 303 if ([_handler respondsToSelector:@selector(didWriteMessage)]) { 304 dispatch_async(_dispatchQueue, ^{ 305 id<GRPCProtoResponseHandler> copiedHandler = nil; 306 @synchronized(self) { 307 copiedHandler = self->_handler; 308 } 309 [copiedHandler didWriteMessage]; 310 }); 311 } 312 } 313} 314 315- (dispatch_queue_t)dispatchQueue { 316 return _dispatchQueue; 317} 318 319@end 320 321/** 322 * Generate an NSError object that represents a failure in parsing a proto class. 323 */ 324NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsingError) { 325 NSDictionary *info = @{ 326 NSLocalizedDescriptionKey : @"Unable to parse response from the server", 327 NSLocalizedRecoverySuggestionErrorKey : 328 @"If this RPC is idempotent, retry " 329 @"with exponential backoff. Otherwise, query the server status before " 330 @"retrying.", 331 NSUnderlyingErrorKey : parsingError, 332 @"Expected class" : expectedClass, 333 @"Received value" : proto, 334 }; 335 // TODO(jcanizales): Use kGRPCErrorDomain and GRPCErrorCodeInternal when they're public. 336 return [NSError errorWithDomain:@"io.grpc" code:13 userInfo:info]; 337} 338