1*cc02d7e2SAndroid Build Coastguard Worker/* 2*cc02d7e2SAndroid Build Coastguard Worker * 3*cc02d7e2SAndroid Build Coastguard Worker * Copyright 2019 gRPC authors. 4*cc02d7e2SAndroid Build Coastguard Worker * 5*cc02d7e2SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License"); 6*cc02d7e2SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License. 7*cc02d7e2SAndroid Build Coastguard Worker * You may obtain a copy of the License at 8*cc02d7e2SAndroid Build Coastguard Worker * 9*cc02d7e2SAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0 10*cc02d7e2SAndroid Build Coastguard Worker * 11*cc02d7e2SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software 12*cc02d7e2SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS, 13*cc02d7e2SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*cc02d7e2SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and 15*cc02d7e2SAndroid Build Coastguard Worker * limitations under the License. 16*cc02d7e2SAndroid Build Coastguard Worker * 17*cc02d7e2SAndroid Build Coastguard Worker */ 18*cc02d7e2SAndroid Build Coastguard Worker 19*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCallLegacy.h" 20*cc02d7e2SAndroid Build Coastguard Worker 21*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCall+OAuth2.h" 22*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCallOptions.h" 23*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCTypes.h" 24*cc02d7e2SAndroid Build Coastguard Worker 25*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCChannelPool.h" 26*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCCompletionQueue.h" 27*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCHost.h" 28*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCWrappedCall.h" 29*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/NSData+GRPC.h" 30*cc02d7e2SAndroid Build Coastguard Worker 31*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXBufferedPipe.h> 32*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXConcurrentWriteable.h> 33*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXImmediateSingleWriter.h> 34*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXWriter+Immediate.h> 35*cc02d7e2SAndroid Build Coastguard Worker 36*cc02d7e2SAndroid Build Coastguard Worker#include <grpc/grpc.h> 37*cc02d7e2SAndroid Build Coastguard Worker 38*cc02d7e2SAndroid Build Coastguard Workerconst char *kCFStreamVarName = "grpc_cfstream"; 39*cc02d7e2SAndroid Build Coastguard Workerstatic NSMutableDictionary *callFlags; 40*cc02d7e2SAndroid Build Coastguard Worker 41*cc02d7e2SAndroid Build Coastguard Worker// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA, 42*cc02d7e2SAndroid Build Coastguard Worker// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE, 43*cc02d7e2SAndroid Build Coastguard Worker// and RECV_STATUS_ON_CLIENT. 44*cc02d7e2SAndroid Build Coastguard WorkerNSInteger kMaxClientBatch = 6; 45*cc02d7e2SAndroid Build Coastguard Worker 46*cc02d7e2SAndroid Build Coastguard Workerstatic NSString *const kAuthorizationHeader = @"authorization"; 47*cc02d7e2SAndroid Build Coastguard Workerstatic NSString *const kBearerPrefix = @"Bearer "; 48*cc02d7e2SAndroid Build Coastguard Worker 49*cc02d7e2SAndroid Build Coastguard Worker@interface GRPCCall () <GRXWriteable> 50*cc02d7e2SAndroid Build Coastguard Worker// Make them read-write. 51*cc02d7e2SAndroid Build Coastguard Worker@property(atomic, copy) NSDictionary *responseHeaders; 52*cc02d7e2SAndroid Build Coastguard Worker@property(atomic, copy) NSDictionary *responseTrailers; 53*cc02d7e2SAndroid Build Coastguard Worker 54*cc02d7e2SAndroid Build Coastguard Worker- (void)receiveNextMessages:(NSUInteger)numberOfMessages; 55*cc02d7e2SAndroid Build Coastguard Worker 56*cc02d7e2SAndroid Build Coastguard Worker@end 57*cc02d7e2SAndroid Build Coastguard Worker 58*cc02d7e2SAndroid Build Coastguard Worker// The following methods of a C gRPC call object aren't reentrant, and thus 59*cc02d7e2SAndroid Build Coastguard Worker// calls to them must be serialized: 60*cc02d7e2SAndroid Build Coastguard Worker// - start_batch 61*cc02d7e2SAndroid Build Coastguard Worker// - destroy 62*cc02d7e2SAndroid Build Coastguard Worker// 63*cc02d7e2SAndroid Build Coastguard Worker// start_batch with a SEND_MESSAGE argument can only be called after the 64*cc02d7e2SAndroid Build Coastguard Worker// OP_COMPLETE event for any previous write is received. This is achieved by 65*cc02d7e2SAndroid Build Coastguard Worker// pausing the requests writer immediately every time it writes a value, and 66*cc02d7e2SAndroid Build Coastguard Worker// resuming it again when OP_COMPLETE is received. 67*cc02d7e2SAndroid Build Coastguard Worker// 68*cc02d7e2SAndroid Build Coastguard Worker// Similarly, start_batch with a RECV_MESSAGE argument can only be called after 69*cc02d7e2SAndroid Build Coastguard Worker// the OP_COMPLETE event for any previous read is received.This is easier to 70*cc02d7e2SAndroid Build Coastguard Worker// enforce, as we're writing the received messages into the writeable: 71*cc02d7e2SAndroid Build Coastguard Worker// start_batch is enqueued once upon receiving the OP_COMPLETE event for the 72*cc02d7e2SAndroid Build Coastguard Worker// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for 73*cc02d7e2SAndroid Build Coastguard Worker// each RECV_MESSAGE batch. 74*cc02d7e2SAndroid Build Coastguard Worker@implementation GRPCCall { 75*cc02d7e2SAndroid Build Coastguard Worker dispatch_queue_t _callQueue; 76*cc02d7e2SAndroid Build Coastguard Worker 77*cc02d7e2SAndroid Build Coastguard Worker NSString *_host; 78*cc02d7e2SAndroid Build Coastguard Worker NSString *_path; 79*cc02d7e2SAndroid Build Coastguard Worker GRPCCallSafety _callSafety; 80*cc02d7e2SAndroid Build Coastguard Worker GRPCCallOptions *_callOptions; 81*cc02d7e2SAndroid Build Coastguard Worker GRPCWrappedCall *_wrappedCall; 82*cc02d7e2SAndroid Build Coastguard Worker 83*cc02d7e2SAndroid Build Coastguard Worker // The C gRPC library has less guarantees on the ordering of events than we 84*cc02d7e2SAndroid Build Coastguard Worker // do. Particularly, in the face of errors, there's no ordering guarantee at 85*cc02d7e2SAndroid Build Coastguard Worker // all. This wrapper over our actual writeable ensures thread-safety and 86*cc02d7e2SAndroid Build Coastguard Worker // correct ordering. 87*cc02d7e2SAndroid Build Coastguard Worker GRXConcurrentWriteable *_responseWriteable; 88*cc02d7e2SAndroid Build Coastguard Worker 89*cc02d7e2SAndroid Build Coastguard Worker // The network thread wants the requestWriter to resume (when the server is ready for more input), 90*cc02d7e2SAndroid Build Coastguard Worker // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop 91*cc02d7e2SAndroid Build Coastguard Worker // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. 92*cc02d7e2SAndroid Build Coastguard Worker // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or 93*cc02d7e2SAndroid Build Coastguard Worker // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to 94*cc02d7e2SAndroid Build Coastguard Worker // pause the writer immediately on writeValue:, so we need our locking to be recursive. 95*cc02d7e2SAndroid Build Coastguard Worker GRXWriter *_requestWriter; 96*cc02d7e2SAndroid Build Coastguard Worker 97*cc02d7e2SAndroid Build Coastguard Worker // To create a retain cycle when a call is started, up until it finishes. See 98*cc02d7e2SAndroid Build Coastguard Worker // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a 99*cc02d7e2SAndroid Build Coastguard Worker // reference to the call object if all they're interested in is the handler being executed when 100*cc02d7e2SAndroid Build Coastguard Worker // the response arrives. 101*cc02d7e2SAndroid Build Coastguard Worker GRPCCall *_retainSelf; 102*cc02d7e2SAndroid Build Coastguard Worker 103*cc02d7e2SAndroid Build Coastguard Worker GRPCRequestHeaders *_requestHeaders; 104*cc02d7e2SAndroid Build Coastguard Worker 105*cc02d7e2SAndroid Build Coastguard Worker // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type 106*cc02d7e2SAndroid Build Coastguard Worker // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core 107*cc02d7e2SAndroid Build Coastguard Worker // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when 108*cc02d7e2SAndroid Build Coastguard Worker // the SendClose op is added. 109*cc02d7e2SAndroid Build Coastguard Worker BOOL _unaryCall; 110*cc02d7e2SAndroid Build Coastguard Worker NSMutableArray *_unaryOpBatch; 111*cc02d7e2SAndroid Build Coastguard Worker 112*cc02d7e2SAndroid Build Coastguard Worker // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch 113*cc02d7e2SAndroid Build Coastguard Worker // queue 114*cc02d7e2SAndroid Build Coastguard Worker dispatch_queue_t _responseQueue; 115*cc02d7e2SAndroid Build Coastguard Worker 116*cc02d7e2SAndroid Build Coastguard Worker // The OAuth2 token fetched from a token provider. 117*cc02d7e2SAndroid Build Coastguard Worker NSString *_fetchedOauth2AccessToken; 118*cc02d7e2SAndroid Build Coastguard Worker 119*cc02d7e2SAndroid Build Coastguard Worker // The callback to be called when a write message op is done. 120*cc02d7e2SAndroid Build Coastguard Worker void (^_writeDone)(void); 121*cc02d7e2SAndroid Build Coastguard Worker 122*cc02d7e2SAndroid Build Coastguard Worker // Indicate a read request to core is pending. 123*cc02d7e2SAndroid Build Coastguard Worker BOOL _pendingCoreRead; 124*cc02d7e2SAndroid Build Coastguard Worker 125*cc02d7e2SAndroid Build Coastguard Worker // Indicate pending read message request from user. 126*cc02d7e2SAndroid Build Coastguard Worker NSUInteger _pendingReceiveNextMessages; 127*cc02d7e2SAndroid Build Coastguard Worker} 128*cc02d7e2SAndroid Build Coastguard Worker 129*cc02d7e2SAndroid Build Coastguard Worker@synthesize state = _state; 130*cc02d7e2SAndroid Build Coastguard Worker 131*cc02d7e2SAndroid Build Coastguard Worker+ (void)initialize { 132*cc02d7e2SAndroid Build Coastguard Worker // Guarantees the code in {} block is invoked only once. See ref at: 133*cc02d7e2SAndroid Build Coastguard Worker // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc 134*cc02d7e2SAndroid Build Coastguard Worker if (self == [GRPCCall self]) { 135*cc02d7e2SAndroid Build Coastguard Worker grpc_init(); 136*cc02d7e2SAndroid Build Coastguard Worker callFlags = [NSMutableDictionary dictionary]; 137*cc02d7e2SAndroid Build Coastguard Worker } 138*cc02d7e2SAndroid Build Coastguard Worker} 139*cc02d7e2SAndroid Build Coastguard Worker 140*cc02d7e2SAndroid Build Coastguard Worker+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path { 141*cc02d7e2SAndroid Build Coastguard Worker if (host.length == 0 || path.length == 0) { 142*cc02d7e2SAndroid Build Coastguard Worker return; 143*cc02d7e2SAndroid Build Coastguard Worker } 144*cc02d7e2SAndroid Build Coastguard Worker NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 145*cc02d7e2SAndroid Build Coastguard Worker @synchronized(callFlags) { 146*cc02d7e2SAndroid Build Coastguard Worker switch (callSafety) { 147*cc02d7e2SAndroid Build Coastguard Worker case GRPCCallSafetyDefault: 148*cc02d7e2SAndroid Build Coastguard Worker callFlags[hostAndPath] = @0; 149*cc02d7e2SAndroid Build Coastguard Worker break; 150*cc02d7e2SAndroid Build Coastguard Worker default: 151*cc02d7e2SAndroid Build Coastguard Worker break; 152*cc02d7e2SAndroid Build Coastguard Worker } 153*cc02d7e2SAndroid Build Coastguard Worker } 154*cc02d7e2SAndroid Build Coastguard Worker} 155*cc02d7e2SAndroid Build Coastguard Worker 156*cc02d7e2SAndroid Build Coastguard Worker+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { 157*cc02d7e2SAndroid Build Coastguard Worker NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 158*cc02d7e2SAndroid Build Coastguard Worker @synchronized(callFlags) { 159*cc02d7e2SAndroid Build Coastguard Worker return [callFlags[hostAndPath] intValue]; 160*cc02d7e2SAndroid Build Coastguard Worker } 161*cc02d7e2SAndroid Build Coastguard Worker} 162*cc02d7e2SAndroid Build Coastguard Worker 163*cc02d7e2SAndroid Build Coastguard Worker- (instancetype)initWithHost:(NSString *)host 164*cc02d7e2SAndroid Build Coastguard Worker path:(NSString *)path 165*cc02d7e2SAndroid Build Coastguard Worker requestsWriter:(GRXWriter *)requestWriter { 166*cc02d7e2SAndroid Build Coastguard Worker return [self initWithHost:host 167*cc02d7e2SAndroid Build Coastguard Worker path:path 168*cc02d7e2SAndroid Build Coastguard Worker callSafety:GRPCCallSafetyDefault 169*cc02d7e2SAndroid Build Coastguard Worker requestsWriter:requestWriter 170*cc02d7e2SAndroid Build Coastguard Worker callOptions:nil 171*cc02d7e2SAndroid Build Coastguard Worker writeDone:nil]; 172*cc02d7e2SAndroid Build Coastguard Worker} 173*cc02d7e2SAndroid Build Coastguard Worker 174*cc02d7e2SAndroid Build Coastguard Worker- (instancetype)initWithHost:(NSString *)host 175*cc02d7e2SAndroid Build Coastguard Worker path:(NSString *)path 176*cc02d7e2SAndroid Build Coastguard Worker callSafety:(GRPCCallSafety)safety 177*cc02d7e2SAndroid Build Coastguard Worker requestsWriter:(GRXWriter *)requestsWriter 178*cc02d7e2SAndroid Build Coastguard Worker callOptions:(GRPCCallOptions *)callOptions 179*cc02d7e2SAndroid Build Coastguard Worker writeDone:(void (^)(void))writeDone { 180*cc02d7e2SAndroid Build Coastguard Worker // Purposely using pointer rather than length (host.length == 0) for backwards compatibility. 181*cc02d7e2SAndroid Build Coastguard Worker NSAssert(host != nil && path != nil, @"Neither host nor path can be nil."); 182*cc02d7e2SAndroid Build Coastguard Worker NSAssert(safety <= GRPCCallSafetyDefault, @"Invalid call safety value."); 183*cc02d7e2SAndroid Build Coastguard Worker NSAssert(requestsWriter.state == GRXWriterStateNotStarted, 184*cc02d7e2SAndroid Build Coastguard Worker @"The requests writer can't be already started."); 185*cc02d7e2SAndroid Build Coastguard Worker if (!host || !path) { 186*cc02d7e2SAndroid Build Coastguard Worker return nil; 187*cc02d7e2SAndroid Build Coastguard Worker } 188*cc02d7e2SAndroid Build Coastguard Worker if (requestsWriter.state != GRXWriterStateNotStarted) { 189*cc02d7e2SAndroid Build Coastguard Worker return nil; 190*cc02d7e2SAndroid Build Coastguard Worker } 191*cc02d7e2SAndroid Build Coastguard Worker 192*cc02d7e2SAndroid Build Coastguard Worker if ((self = [super init])) { 193*cc02d7e2SAndroid Build Coastguard Worker _host = [host copy]; 194*cc02d7e2SAndroid Build Coastguard Worker _path = [path copy]; 195*cc02d7e2SAndroid Build Coastguard Worker _callSafety = safety; 196*cc02d7e2SAndroid Build Coastguard Worker _callOptions = [callOptions copy]; 197*cc02d7e2SAndroid Build Coastguard Worker 198*cc02d7e2SAndroid Build Coastguard Worker // Serial queue to invoke the non-reentrant methods of the grpc_call object. 199*cc02d7e2SAndroid Build Coastguard Worker _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL); 200*cc02d7e2SAndroid Build Coastguard Worker 201*cc02d7e2SAndroid Build Coastguard Worker _requestWriter = requestsWriter; 202*cc02d7e2SAndroid Build Coastguard Worker _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; 203*cc02d7e2SAndroid Build Coastguard Worker _writeDone = writeDone; 204*cc02d7e2SAndroid Build Coastguard Worker 205*cc02d7e2SAndroid Build Coastguard Worker if ([requestsWriter isKindOfClass:[GRXImmediateSingleWriter class]]) { 206*cc02d7e2SAndroid Build Coastguard Worker _unaryCall = YES; 207*cc02d7e2SAndroid Build Coastguard Worker _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; 208*cc02d7e2SAndroid Build Coastguard Worker } 209*cc02d7e2SAndroid Build Coastguard Worker 210*cc02d7e2SAndroid Build Coastguard Worker _responseQueue = dispatch_get_main_queue(); 211*cc02d7e2SAndroid Build Coastguard Worker 212*cc02d7e2SAndroid Build Coastguard Worker // do not start a read until initial metadata is received 213*cc02d7e2SAndroid Build Coastguard Worker _pendingReceiveNextMessages = 0; 214*cc02d7e2SAndroid Build Coastguard Worker _pendingCoreRead = YES; 215*cc02d7e2SAndroid Build Coastguard Worker } 216*cc02d7e2SAndroid Build Coastguard Worker return self; 217*cc02d7e2SAndroid Build Coastguard Worker} 218*cc02d7e2SAndroid Build Coastguard Worker 219*cc02d7e2SAndroid Build Coastguard Worker- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { 220*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 221*cc02d7e2SAndroid Build Coastguard Worker if (_state != GRXWriterStateNotStarted) { 222*cc02d7e2SAndroid Build Coastguard Worker return; 223*cc02d7e2SAndroid Build Coastguard Worker } 224*cc02d7e2SAndroid Build Coastguard Worker _responseQueue = queue; 225*cc02d7e2SAndroid Build Coastguard Worker } 226*cc02d7e2SAndroid Build Coastguard Worker} 227*cc02d7e2SAndroid Build Coastguard Worker 228*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Finish 229*cc02d7e2SAndroid Build Coastguard Worker 230*cc02d7e2SAndroid Build Coastguard Worker// This function should support being called within a @synchronized(self) block in another function 231*cc02d7e2SAndroid Build Coastguard Worker// Should not manipulate _requestWriter for deadlock prevention. 232*cc02d7e2SAndroid Build Coastguard Worker- (void)finishWithError:(NSError *)errorOrNil { 233*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 234*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStateFinished) { 235*cc02d7e2SAndroid Build Coastguard Worker return; 236*cc02d7e2SAndroid Build Coastguard Worker } 237*cc02d7e2SAndroid Build Coastguard Worker _state = GRXWriterStateFinished; 238*cc02d7e2SAndroid Build Coastguard Worker 239*cc02d7e2SAndroid Build Coastguard Worker if (errorOrNil) { 240*cc02d7e2SAndroid Build Coastguard Worker [_responseWriteable cancelWithError:errorOrNil]; 241*cc02d7e2SAndroid Build Coastguard Worker } else { 242*cc02d7e2SAndroid Build Coastguard Worker [_responseWriteable enqueueSuccessfulCompletion]; 243*cc02d7e2SAndroid Build Coastguard Worker } 244*cc02d7e2SAndroid Build Coastguard Worker 245*cc02d7e2SAndroid Build Coastguard Worker // If the call isn't retained anywhere else, it can be deallocated now. 246*cc02d7e2SAndroid Build Coastguard Worker _retainSelf = nil; 247*cc02d7e2SAndroid Build Coastguard Worker } 248*cc02d7e2SAndroid Build Coastguard Worker} 249*cc02d7e2SAndroid Build Coastguard Worker 250*cc02d7e2SAndroid Build Coastguard Worker- (void)cancel { 251*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 252*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStateFinished) { 253*cc02d7e2SAndroid Build Coastguard Worker return; 254*cc02d7e2SAndroid Build Coastguard Worker } 255*cc02d7e2SAndroid Build Coastguard Worker [self finishWithError:[NSError 256*cc02d7e2SAndroid Build Coastguard Worker errorWithDomain:kGRPCErrorDomain 257*cc02d7e2SAndroid Build Coastguard Worker code:GRPCErrorCodeCancelled 258*cc02d7e2SAndroid Build Coastguard Worker userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; 259*cc02d7e2SAndroid Build Coastguard Worker [_wrappedCall cancel]; 260*cc02d7e2SAndroid Build Coastguard Worker } 261*cc02d7e2SAndroid Build Coastguard Worker _requestWriter.state = GRXWriterStateFinished; 262*cc02d7e2SAndroid Build Coastguard Worker} 263*cc02d7e2SAndroid Build Coastguard Worker 264*cc02d7e2SAndroid Build Coastguard Worker- (void)dealloc { 265*cc02d7e2SAndroid Build Coastguard Worker if (_callQueue) { 266*cc02d7e2SAndroid Build Coastguard Worker __block GRPCWrappedCall *wrappedCall = _wrappedCall; 267*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 268*cc02d7e2SAndroid Build Coastguard Worker wrappedCall = nil; 269*cc02d7e2SAndroid Build Coastguard Worker }); 270*cc02d7e2SAndroid Build Coastguard Worker } else { 271*cc02d7e2SAndroid Build Coastguard Worker _wrappedCall = nil; 272*cc02d7e2SAndroid Build Coastguard Worker } 273*cc02d7e2SAndroid Build Coastguard Worker} 274*cc02d7e2SAndroid Build Coastguard Worker 275*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Read messages 276*cc02d7e2SAndroid Build Coastguard Worker 277*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue. 278*cc02d7e2SAndroid Build Coastguard Worker// The handler will be called from the network queue. 279*cc02d7e2SAndroid Build Coastguard Worker- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler { 280*cc02d7e2SAndroid Build Coastguard Worker // TODO(jcanizales): Add error handlers for async failures 281*cc02d7e2SAndroid Build Coastguard Worker [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]]; 282*cc02d7e2SAndroid Build Coastguard Worker} 283*cc02d7e2SAndroid Build Coastguard Worker 284*cc02d7e2SAndroid Build Coastguard Worker// Called initially from the network queue once response headers are received, 285*cc02d7e2SAndroid Build Coastguard Worker// then "recursively" from the responseWriteable queue after each response from the 286*cc02d7e2SAndroid Build Coastguard Worker// server has been written. 287*cc02d7e2SAndroid Build Coastguard Worker// If the call is currently paused, this is a noop. Restarting the call will invoke this 288*cc02d7e2SAndroid Build Coastguard Worker// method. 289*cc02d7e2SAndroid Build Coastguard Worker// TODO(jcanizales): Rename to readResponseIfNotPaused. 290*cc02d7e2SAndroid Build Coastguard Worker- (void)maybeStartNextRead { 291*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 292*cc02d7e2SAndroid Build Coastguard Worker if (_state != GRXWriterStateStarted) { 293*cc02d7e2SAndroid Build Coastguard Worker return; 294*cc02d7e2SAndroid Build Coastguard Worker } 295*cc02d7e2SAndroid Build Coastguard Worker if (_callOptions.flowControlEnabled && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) { 296*cc02d7e2SAndroid Build Coastguard Worker return; 297*cc02d7e2SAndroid Build Coastguard Worker } 298*cc02d7e2SAndroid Build Coastguard Worker _pendingCoreRead = YES; 299*cc02d7e2SAndroid Build Coastguard Worker _pendingReceiveNextMessages--; 300*cc02d7e2SAndroid Build Coastguard Worker } 301*cc02d7e2SAndroid Build Coastguard Worker 302*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 303*cc02d7e2SAndroid Build Coastguard Worker __weak GRPCCall *weakSelf = self; 304*cc02d7e2SAndroid Build Coastguard Worker [self startReadWithHandler:^(grpc_byte_buffer *message) { 305*cc02d7e2SAndroid Build Coastguard Worker if (message == NULL) { 306*cc02d7e2SAndroid Build Coastguard Worker // No more messages from the server 307*cc02d7e2SAndroid Build Coastguard Worker return; 308*cc02d7e2SAndroid Build Coastguard Worker } 309*cc02d7e2SAndroid Build Coastguard Worker __strong GRPCCall *strongSelf = weakSelf; 310*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf == nil) { 311*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer_destroy(message); 312*cc02d7e2SAndroid Build Coastguard Worker return; 313*cc02d7e2SAndroid Build Coastguard Worker } 314*cc02d7e2SAndroid Build Coastguard Worker NSData *data = [NSData grpc_dataWithByteBuffer:message]; 315*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer_destroy(message); 316*cc02d7e2SAndroid Build Coastguard Worker if (!data) { 317*cc02d7e2SAndroid Build Coastguard Worker // The app doesn't have enough memory to hold the server response. We 318*cc02d7e2SAndroid Build Coastguard Worker // don't want to throw, because the app shouldn't crash for a behavior 319*cc02d7e2SAndroid Build Coastguard Worker // that's on the hands of any server to have. Instead we finish and ask 320*cc02d7e2SAndroid Build Coastguard Worker // the server to cancel. 321*cc02d7e2SAndroid Build Coastguard Worker @synchronized(strongSelf) { 322*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_pendingCoreRead = NO; 323*cc02d7e2SAndroid Build Coastguard Worker [strongSelf 324*cc02d7e2SAndroid Build Coastguard Worker finishWithError:[NSError errorWithDomain:kGRPCErrorDomain 325*cc02d7e2SAndroid Build Coastguard Worker code:GRPCErrorCodeResourceExhausted 326*cc02d7e2SAndroid Build Coastguard Worker userInfo:@{ 327*cc02d7e2SAndroid Build Coastguard Worker NSLocalizedDescriptionKey : 328*cc02d7e2SAndroid Build Coastguard Worker @"Client does not have enough memory to " 329*cc02d7e2SAndroid Build Coastguard Worker @"hold the server response." 330*cc02d7e2SAndroid Build Coastguard Worker }]]; 331*cc02d7e2SAndroid Build Coastguard Worker [strongSelf->_wrappedCall cancel]; 332*cc02d7e2SAndroid Build Coastguard Worker } 333*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_requestWriter.state = GRXWriterStateFinished; 334*cc02d7e2SAndroid Build Coastguard Worker } else { 335*cc02d7e2SAndroid Build Coastguard Worker @synchronized(strongSelf) { 336*cc02d7e2SAndroid Build Coastguard Worker [strongSelf->_responseWriteable enqueueValue:data 337*cc02d7e2SAndroid Build Coastguard Worker completionHandler:^{ 338*cc02d7e2SAndroid Build Coastguard Worker __strong GRPCCall *strongSelf = weakSelf; 339*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf) { 340*cc02d7e2SAndroid Build Coastguard Worker @synchronized(strongSelf) { 341*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_pendingCoreRead = NO; 342*cc02d7e2SAndroid Build Coastguard Worker [strongSelf maybeStartNextRead]; 343*cc02d7e2SAndroid Build Coastguard Worker } 344*cc02d7e2SAndroid Build Coastguard Worker } 345*cc02d7e2SAndroid Build Coastguard Worker }]; 346*cc02d7e2SAndroid Build Coastguard Worker } 347*cc02d7e2SAndroid Build Coastguard Worker } 348*cc02d7e2SAndroid Build Coastguard Worker }]; 349*cc02d7e2SAndroid Build Coastguard Worker }); 350*cc02d7e2SAndroid Build Coastguard Worker} 351*cc02d7e2SAndroid Build Coastguard Worker 352*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Send headers 353*cc02d7e2SAndroid Build Coastguard Worker 354*cc02d7e2SAndroid Build Coastguard Worker- (void)sendHeaders { 355*cc02d7e2SAndroid Build Coastguard Worker // TODO (mxyan): Remove after deprecated methods are removed 356*cc02d7e2SAndroid Build Coastguard Worker uint32_t callSafetyFlags = 0; 357*cc02d7e2SAndroid Build Coastguard Worker 358*cc02d7e2SAndroid Build Coastguard Worker NSMutableDictionary *headers = [_requestHeaders mutableCopy]; 359*cc02d7e2SAndroid Build Coastguard Worker NSString *fetchedOauth2AccessToken; 360*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 361*cc02d7e2SAndroid Build Coastguard Worker fetchedOauth2AccessToken = _fetchedOauth2AccessToken; 362*cc02d7e2SAndroid Build Coastguard Worker } 363*cc02d7e2SAndroid Build Coastguard Worker if (fetchedOauth2AccessToken != nil) { 364*cc02d7e2SAndroid Build Coastguard Worker headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken]; 365*cc02d7e2SAndroid Build Coastguard Worker } else if (_callOptions.oauth2AccessToken != nil) { 366*cc02d7e2SAndroid Build Coastguard Worker headers[@"authorization"] = 367*cc02d7e2SAndroid Build Coastguard Worker [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken]; 368*cc02d7e2SAndroid Build Coastguard Worker } 369*cc02d7e2SAndroid Build Coastguard Worker 370*cc02d7e2SAndroid Build Coastguard Worker // TODO(jcanizales): Add error handlers for async failures 371*cc02d7e2SAndroid Build Coastguard Worker GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] 372*cc02d7e2SAndroid Build Coastguard Worker initWithMetadata:headers 373*cc02d7e2SAndroid Build Coastguard Worker flags:callSafetyFlags 374*cc02d7e2SAndroid Build Coastguard Worker handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA 375*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 376*cc02d7e2SAndroid Build Coastguard Worker if (!self->_unaryCall) { 377*cc02d7e2SAndroid Build Coastguard Worker [self->_wrappedCall startBatchWithOperations:@[ op ]]; 378*cc02d7e2SAndroid Build Coastguard Worker } else { 379*cc02d7e2SAndroid Build Coastguard Worker [self->_unaryOpBatch addObject:op]; 380*cc02d7e2SAndroid Build Coastguard Worker } 381*cc02d7e2SAndroid Build Coastguard Worker }); 382*cc02d7e2SAndroid Build Coastguard Worker} 383*cc02d7e2SAndroid Build Coastguard Worker 384*cc02d7e2SAndroid Build Coastguard Worker- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 385*cc02d7e2SAndroid Build Coastguard Worker if (numberOfMessages == 0) { 386*cc02d7e2SAndroid Build Coastguard Worker return; 387*cc02d7e2SAndroid Build Coastguard Worker } 388*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 389*cc02d7e2SAndroid Build Coastguard Worker _pendingReceiveNextMessages += numberOfMessages; 390*cc02d7e2SAndroid Build Coastguard Worker 391*cc02d7e2SAndroid Build Coastguard Worker if (_state != GRXWriterStateStarted || !_callOptions.flowControlEnabled) { 392*cc02d7e2SAndroid Build Coastguard Worker return; 393*cc02d7e2SAndroid Build Coastguard Worker } 394*cc02d7e2SAndroid Build Coastguard Worker [self maybeStartNextRead]; 395*cc02d7e2SAndroid Build Coastguard Worker } 396*cc02d7e2SAndroid Build Coastguard Worker} 397*cc02d7e2SAndroid Build Coastguard Worker 398*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriteable implementation 399*cc02d7e2SAndroid Build Coastguard Worker 400*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue. The error handler will be called from the 401*cc02d7e2SAndroid Build Coastguard Worker// network queue if the write didn't succeed. 402*cc02d7e2SAndroid Build Coastguard Worker// If the call is a unary call, parameter \a errorHandler will be ignored and 403*cc02d7e2SAndroid Build Coastguard Worker// the error handler of GRPCOpSendClose will be executed in case of error. 404*cc02d7e2SAndroid Build Coastguard Worker- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler { 405*cc02d7e2SAndroid Build Coastguard Worker __weak GRPCCall *weakSelf = self; 406*cc02d7e2SAndroid Build Coastguard Worker void (^resumingHandler)(void) = ^{ 407*cc02d7e2SAndroid Build Coastguard Worker // Resume the request writer. 408*cc02d7e2SAndroid Build Coastguard Worker GRPCCall *strongSelf = weakSelf; 409*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf) { 410*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_requestWriter.state = GRXWriterStateStarted; 411*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf->_writeDone) { 412*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_writeDone(); 413*cc02d7e2SAndroid Build Coastguard Worker } 414*cc02d7e2SAndroid Build Coastguard Worker } 415*cc02d7e2SAndroid Build Coastguard Worker }; 416*cc02d7e2SAndroid Build Coastguard Worker GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message 417*cc02d7e2SAndroid Build Coastguard Worker handler:resumingHandler]; 418*cc02d7e2SAndroid Build Coastguard Worker if (!_unaryCall) { 419*cc02d7e2SAndroid Build Coastguard Worker [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler]; 420*cc02d7e2SAndroid Build Coastguard Worker } else { 421*cc02d7e2SAndroid Build Coastguard Worker // Ignored errorHandler since it is the same as the one for GRPCOpSendClose. 422*cc02d7e2SAndroid Build Coastguard Worker // TODO (mxyan): unify the error handlers of all Ops into a single closure. 423*cc02d7e2SAndroid Build Coastguard Worker [_unaryOpBatch addObject:op]; 424*cc02d7e2SAndroid Build Coastguard Worker } 425*cc02d7e2SAndroid Build Coastguard Worker} 426*cc02d7e2SAndroid Build Coastguard Worker 427*cc02d7e2SAndroid Build Coastguard Worker- (void)writeValue:(id)value { 428*cc02d7e2SAndroid Build Coastguard Worker NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); 429*cc02d7e2SAndroid Build Coastguard Worker 430*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 431*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStateFinished) { 432*cc02d7e2SAndroid Build Coastguard Worker return; 433*cc02d7e2SAndroid Build Coastguard Worker } 434*cc02d7e2SAndroid Build Coastguard Worker } 435*cc02d7e2SAndroid Build Coastguard Worker 436*cc02d7e2SAndroid Build Coastguard Worker // Pause the input and only resume it when the C layer notifies us that writes 437*cc02d7e2SAndroid Build Coastguard Worker // can proceed. 438*cc02d7e2SAndroid Build Coastguard Worker _requestWriter.state = GRXWriterStatePaused; 439*cc02d7e2SAndroid Build Coastguard Worker 440*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 441*cc02d7e2SAndroid Build Coastguard Worker // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 442*cc02d7e2SAndroid Build Coastguard Worker [self writeMessage:value withErrorHandler:nil]; 443*cc02d7e2SAndroid Build Coastguard Worker }); 444*cc02d7e2SAndroid Build Coastguard Worker} 445*cc02d7e2SAndroid Build Coastguard Worker 446*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue. The error handler will be called from the 447*cc02d7e2SAndroid Build Coastguard Worker// network queue if the requests stream couldn't be closed successfully. 448*cc02d7e2SAndroid Build Coastguard Worker- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler { 449*cc02d7e2SAndroid Build Coastguard Worker if (!_unaryCall) { 450*cc02d7e2SAndroid Build Coastguard Worker [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ] 451*cc02d7e2SAndroid Build Coastguard Worker errorHandler:errorHandler]; 452*cc02d7e2SAndroid Build Coastguard Worker } else { 453*cc02d7e2SAndroid Build Coastguard Worker [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; 454*cc02d7e2SAndroid Build Coastguard Worker [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler]; 455*cc02d7e2SAndroid Build Coastguard Worker } 456*cc02d7e2SAndroid Build Coastguard Worker} 457*cc02d7e2SAndroid Build Coastguard Worker 458*cc02d7e2SAndroid Build Coastguard Worker- (void)writesFinishedWithError:(NSError *)errorOrNil { 459*cc02d7e2SAndroid Build Coastguard Worker if (errorOrNil) { 460*cc02d7e2SAndroid Build Coastguard Worker [self cancel]; 461*cc02d7e2SAndroid Build Coastguard Worker } else { 462*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 463*cc02d7e2SAndroid Build Coastguard Worker // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 464*cc02d7e2SAndroid Build Coastguard Worker [self finishRequestWithErrorHandler:nil]; 465*cc02d7e2SAndroid Build Coastguard Worker }); 466*cc02d7e2SAndroid Build Coastguard Worker } 467*cc02d7e2SAndroid Build Coastguard Worker} 468*cc02d7e2SAndroid Build Coastguard Worker 469*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Invoke 470*cc02d7e2SAndroid Build Coastguard Worker 471*cc02d7e2SAndroid Build Coastguard Worker// Both handlers will eventually be called, from the network queue. Writes can start immediately 472*cc02d7e2SAndroid Build Coastguard Worker// after this. 473*cc02d7e2SAndroid Build Coastguard Worker// The first one (headersHandler), when the response headers are received. 474*cc02d7e2SAndroid Build Coastguard Worker// The second one (completionHandler), whenever the RPC finishes for any reason. 475*cc02d7e2SAndroid Build Coastguard Worker- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler 476*cc02d7e2SAndroid Build Coastguard Worker completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { 477*cc02d7e2SAndroid Build Coastguard Worker dispatch_async(_callQueue, ^{ 478*cc02d7e2SAndroid Build Coastguard Worker // TODO(jcanizales): Add error handlers for async failures 479*cc02d7e2SAndroid Build Coastguard Worker [self->_wrappedCall 480*cc02d7e2SAndroid Build Coastguard Worker startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; 481*cc02d7e2SAndroid Build Coastguard Worker [self->_wrappedCall 482*cc02d7e2SAndroid Build Coastguard Worker startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; 483*cc02d7e2SAndroid Build Coastguard Worker }); 484*cc02d7e2SAndroid Build Coastguard Worker} 485*cc02d7e2SAndroid Build Coastguard Worker 486*cc02d7e2SAndroid Build Coastguard Worker- (void)invokeCall { 487*cc02d7e2SAndroid Build Coastguard Worker __weak GRPCCall *weakSelf = self; 488*cc02d7e2SAndroid Build Coastguard Worker [self 489*cc02d7e2SAndroid Build Coastguard Worker invokeCallWithHeadersHandler:^(NSDictionary *headers) { 490*cc02d7e2SAndroid Build Coastguard Worker // Response headers received. 491*cc02d7e2SAndroid Build Coastguard Worker __strong GRPCCall *strongSelf = weakSelf; 492*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf) { 493*cc02d7e2SAndroid Build Coastguard Worker @synchronized(strongSelf) { 494*cc02d7e2SAndroid Build Coastguard Worker // it is ok to set nil because headers are only received once 495*cc02d7e2SAndroid Build Coastguard Worker strongSelf.responseHeaders = nil; 496*cc02d7e2SAndroid Build Coastguard Worker // copy the header so that the GRPCOpRecvMetadata object may be dealloc'ed 497*cc02d7e2SAndroid Build Coastguard Worker NSDictionary *copiedHeaders = [[NSDictionary alloc] initWithDictionary:headers 498*cc02d7e2SAndroid Build Coastguard Worker copyItems:YES]; 499*cc02d7e2SAndroid Build Coastguard Worker strongSelf.responseHeaders = copiedHeaders; 500*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_pendingCoreRead = NO; 501*cc02d7e2SAndroid Build Coastguard Worker [strongSelf maybeStartNextRead]; 502*cc02d7e2SAndroid Build Coastguard Worker } 503*cc02d7e2SAndroid Build Coastguard Worker } 504*cc02d7e2SAndroid Build Coastguard Worker } 505*cc02d7e2SAndroid Build Coastguard Worker completionHandler:^(NSError *error, NSDictionary *trailers) { 506*cc02d7e2SAndroid Build Coastguard Worker __strong GRPCCall *strongSelf = weakSelf; 507*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf) { 508*cc02d7e2SAndroid Build Coastguard Worker strongSelf.responseTrailers = trailers; 509*cc02d7e2SAndroid Build Coastguard Worker 510*cc02d7e2SAndroid Build Coastguard Worker if (error) { 511*cc02d7e2SAndroid Build Coastguard Worker NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; 512*cc02d7e2SAndroid Build Coastguard Worker if (error.userInfo) { 513*cc02d7e2SAndroid Build Coastguard Worker [userInfo addEntriesFromDictionary:error.userInfo]; 514*cc02d7e2SAndroid Build Coastguard Worker } 515*cc02d7e2SAndroid Build Coastguard Worker userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; 516*cc02d7e2SAndroid Build Coastguard Worker // Since gRPC core does not guarantee the headers block being called before this block, 517*cc02d7e2SAndroid Build Coastguard Worker // responseHeaders might be nil. 518*cc02d7e2SAndroid Build Coastguard Worker userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; 519*cc02d7e2SAndroid Build Coastguard Worker error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; 520*cc02d7e2SAndroid Build Coastguard Worker } 521*cc02d7e2SAndroid Build Coastguard Worker [strongSelf finishWithError:error]; 522*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_requestWriter.state = GRXWriterStateFinished; 523*cc02d7e2SAndroid Build Coastguard Worker } 524*cc02d7e2SAndroid Build Coastguard Worker }]; 525*cc02d7e2SAndroid Build Coastguard Worker} 526*cc02d7e2SAndroid Build Coastguard Worker 527*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriter implementation 528*cc02d7e2SAndroid Build Coastguard Worker 529*cc02d7e2SAndroid Build Coastguard Worker// Lock acquired inside startWithWriteable: 530*cc02d7e2SAndroid Build Coastguard Worker- (void)startCallWithWriteable:(id<GRXWriteable>)writeable { 531*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 532*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStateFinished) { 533*cc02d7e2SAndroid Build Coastguard Worker return; 534*cc02d7e2SAndroid Build Coastguard Worker } 535*cc02d7e2SAndroid Build Coastguard Worker 536*cc02d7e2SAndroid Build Coastguard Worker _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable 537*cc02d7e2SAndroid Build Coastguard Worker dispatchQueue:_responseQueue]; 538*cc02d7e2SAndroid Build Coastguard Worker 539*cc02d7e2SAndroid Build Coastguard Worker GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host 540*cc02d7e2SAndroid Build Coastguard Worker callOptions:_callOptions]; 541*cc02d7e2SAndroid Build Coastguard Worker _wrappedCall = [channel wrappedCallWithPath:_path 542*cc02d7e2SAndroid Build Coastguard Worker completionQueue:[GRPCCompletionQueue completionQueue] 543*cc02d7e2SAndroid Build Coastguard Worker callOptions:_callOptions]; 544*cc02d7e2SAndroid Build Coastguard Worker 545*cc02d7e2SAndroid Build Coastguard Worker if (_wrappedCall == nil) { 546*cc02d7e2SAndroid Build Coastguard Worker [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain 547*cc02d7e2SAndroid Build Coastguard Worker code:GRPCErrorCodeUnavailable 548*cc02d7e2SAndroid Build Coastguard Worker userInfo:@{ 549*cc02d7e2SAndroid Build Coastguard Worker NSLocalizedDescriptionKey : 550*cc02d7e2SAndroid Build Coastguard Worker @"Failed to create call or channel." 551*cc02d7e2SAndroid Build Coastguard Worker }]]; 552*cc02d7e2SAndroid Build Coastguard Worker return; 553*cc02d7e2SAndroid Build Coastguard Worker } 554*cc02d7e2SAndroid Build Coastguard Worker 555*cc02d7e2SAndroid Build Coastguard Worker [self sendHeaders]; 556*cc02d7e2SAndroid Build Coastguard Worker [self invokeCall]; 557*cc02d7e2SAndroid Build Coastguard Worker } 558*cc02d7e2SAndroid Build Coastguard Worker 559*cc02d7e2SAndroid Build Coastguard Worker // Now that the RPC has been initiated, request writes can start. 560*cc02d7e2SAndroid Build Coastguard Worker [_requestWriter startWithWriteable:self]; 561*cc02d7e2SAndroid Build Coastguard Worker} 562*cc02d7e2SAndroid Build Coastguard Worker 563*cc02d7e2SAndroid Build Coastguard Worker- (void)startWithWriteable:(id<GRXWriteable>)writeable { 564*cc02d7e2SAndroid Build Coastguard Worker id<GRPCAuthorizationProtocol> tokenProvider = nil; 565*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 566*cc02d7e2SAndroid Build Coastguard Worker _state = GRXWriterStateStarted; 567*cc02d7e2SAndroid Build Coastguard Worker 568*cc02d7e2SAndroid Build Coastguard Worker // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). 569*cc02d7e2SAndroid Build Coastguard Worker // This makes RPCs in which the call isn't externally retained possible (as long as it is 570*cc02d7e2SAndroid Build Coastguard Worker // started before being autoreleased). Care is taken not to retain self strongly in any of the 571*cc02d7e2SAndroid Build Coastguard Worker // blocks used in this implementation, so that the life of the instance is determined by this 572*cc02d7e2SAndroid Build Coastguard Worker // retain cycle. 573*cc02d7e2SAndroid Build Coastguard Worker _retainSelf = self; 574*cc02d7e2SAndroid Build Coastguard Worker 575*cc02d7e2SAndroid Build Coastguard Worker // If _callOptions is nil, people must be using the deprecated v1 interface. In this case, 576*cc02d7e2SAndroid Build Coastguard Worker // generate the call options from the corresponding GRPCHost configs and apply other options 577*cc02d7e2SAndroid Build Coastguard Worker // that are not covered by GRPCHost. 578*cc02d7e2SAndroid Build Coastguard Worker if (_callOptions == nil) { 579*cc02d7e2SAndroid Build Coastguard Worker GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; 580*cc02d7e2SAndroid Build Coastguard Worker if (_serverName.length != 0) { 581*cc02d7e2SAndroid Build Coastguard Worker callOptions.serverAuthority = _serverName; 582*cc02d7e2SAndroid Build Coastguard Worker } 583*cc02d7e2SAndroid Build Coastguard Worker if (_timeout > 0) { 584*cc02d7e2SAndroid Build Coastguard Worker callOptions.timeout = _timeout; 585*cc02d7e2SAndroid Build Coastguard Worker } 586*cc02d7e2SAndroid Build Coastguard Worker 587*cc02d7e2SAndroid Build Coastguard Worker id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider; 588*cc02d7e2SAndroid Build Coastguard Worker if (tokenProvider != nil) { 589*cc02d7e2SAndroid Build Coastguard Worker callOptions.authTokenProvider = tokenProvider; 590*cc02d7e2SAndroid Build Coastguard Worker } 591*cc02d7e2SAndroid Build Coastguard Worker _callOptions = callOptions; 592*cc02d7e2SAndroid Build Coastguard Worker } 593*cc02d7e2SAndroid Build Coastguard Worker 594*cc02d7e2SAndroid Build Coastguard Worker NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, 595*cc02d7e2SAndroid Build Coastguard Worker @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); 596*cc02d7e2SAndroid Build Coastguard Worker 597*cc02d7e2SAndroid Build Coastguard Worker tokenProvider = _callOptions.authTokenProvider; 598*cc02d7e2SAndroid Build Coastguard Worker } 599*cc02d7e2SAndroid Build Coastguard Worker 600*cc02d7e2SAndroid Build Coastguard Worker if (tokenProvider != nil) { 601*cc02d7e2SAndroid Build Coastguard Worker __weak auto weakSelf = self; 602*cc02d7e2SAndroid Build Coastguard Worker [tokenProvider getTokenWithHandler:^(NSString *token) { 603*cc02d7e2SAndroid Build Coastguard Worker __strong auto strongSelf = weakSelf; 604*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf) { 605*cc02d7e2SAndroid Build Coastguard Worker BOOL startCall = NO; 606*cc02d7e2SAndroid Build Coastguard Worker @synchronized(strongSelf) { 607*cc02d7e2SAndroid Build Coastguard Worker if (strongSelf->_state != GRXWriterStateFinished) { 608*cc02d7e2SAndroid Build Coastguard Worker startCall = YES; 609*cc02d7e2SAndroid Build Coastguard Worker if (token) { 610*cc02d7e2SAndroid Build Coastguard Worker strongSelf->_fetchedOauth2AccessToken = [token copy]; 611*cc02d7e2SAndroid Build Coastguard Worker } 612*cc02d7e2SAndroid Build Coastguard Worker } 613*cc02d7e2SAndroid Build Coastguard Worker } 614*cc02d7e2SAndroid Build Coastguard Worker if (startCall) { 615*cc02d7e2SAndroid Build Coastguard Worker [strongSelf startCallWithWriteable:writeable]; 616*cc02d7e2SAndroid Build Coastguard Worker } 617*cc02d7e2SAndroid Build Coastguard Worker } 618*cc02d7e2SAndroid Build Coastguard Worker }]; 619*cc02d7e2SAndroid Build Coastguard Worker } else { 620*cc02d7e2SAndroid Build Coastguard Worker [self startCallWithWriteable:writeable]; 621*cc02d7e2SAndroid Build Coastguard Worker } 622*cc02d7e2SAndroid Build Coastguard Worker} 623*cc02d7e2SAndroid Build Coastguard Worker 624*cc02d7e2SAndroid Build Coastguard Worker- (void)setState:(GRXWriterState)newState { 625*cc02d7e2SAndroid Build Coastguard Worker @synchronized(self) { 626*cc02d7e2SAndroid Build Coastguard Worker // Manual transitions are only allowed from the started or paused states. 627*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { 628*cc02d7e2SAndroid Build Coastguard Worker return; 629*cc02d7e2SAndroid Build Coastguard Worker } 630*cc02d7e2SAndroid Build Coastguard Worker 631*cc02d7e2SAndroid Build Coastguard Worker switch (newState) { 632*cc02d7e2SAndroid Build Coastguard Worker case GRXWriterStateFinished: 633*cc02d7e2SAndroid Build Coastguard Worker _state = newState; 634*cc02d7e2SAndroid Build Coastguard Worker // Per GRXWriter's contract, setting the state to Finished manually 635*cc02d7e2SAndroid Build Coastguard Worker // means one doesn't wish the writeable to be messaged anymore. 636*cc02d7e2SAndroid Build Coastguard Worker [_responseWriteable cancelSilently]; 637*cc02d7e2SAndroid Build Coastguard Worker _responseWriteable = nil; 638*cc02d7e2SAndroid Build Coastguard Worker return; 639*cc02d7e2SAndroid Build Coastguard Worker case GRXWriterStatePaused: 640*cc02d7e2SAndroid Build Coastguard Worker _state = newState; 641*cc02d7e2SAndroid Build Coastguard Worker return; 642*cc02d7e2SAndroid Build Coastguard Worker case GRXWriterStateStarted: 643*cc02d7e2SAndroid Build Coastguard Worker if (_state == GRXWriterStatePaused) { 644*cc02d7e2SAndroid Build Coastguard Worker _state = newState; 645*cc02d7e2SAndroid Build Coastguard Worker [self maybeStartNextRead]; 646*cc02d7e2SAndroid Build Coastguard Worker } 647*cc02d7e2SAndroid Build Coastguard Worker return; 648*cc02d7e2SAndroid Build Coastguard Worker case GRXWriterStateNotStarted: 649*cc02d7e2SAndroid Build Coastguard Worker return; 650*cc02d7e2SAndroid Build Coastguard Worker } 651*cc02d7e2SAndroid Build Coastguard Worker } 652*cc02d7e2SAndroid Build Coastguard Worker} 653*cc02d7e2SAndroid Build Coastguard Worker 654*cc02d7e2SAndroid Build Coastguard Worker@end 655