xref: /aosp_15_r20/external/grpc-grpc/src/objective-c/GRPCClient/GRPCCallLegacy.mm (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker/*
2*cc02d7e2SAndroid Build Coastguard Worker *
3*cc02d7e2SAndroid Build Coastguard Worker * Copyright 2019 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker *
5*cc02d7e2SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker * You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker *
9*cc02d7e2SAndroid Build Coastguard Worker *     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker *
11*cc02d7e2SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker * limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker *
17*cc02d7e2SAndroid Build Coastguard Worker */
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCallLegacy.h"
20*cc02d7e2SAndroid Build Coastguard Worker
21*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCall+OAuth2.h"
22*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCCallOptions.h"
23*cc02d7e2SAndroid Build Coastguard Worker#import "GRPCTypes.h"
24*cc02d7e2SAndroid Build Coastguard Worker
25*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCChannelPool.h"
26*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCCompletionQueue.h"
27*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCHost.h"
28*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/GRPCWrappedCall.h"
29*cc02d7e2SAndroid Build Coastguard Worker#import "private/GRPCCore/NSData+GRPC.h"
30*cc02d7e2SAndroid Build Coastguard Worker
31*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXBufferedPipe.h>
32*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXConcurrentWriteable.h>
33*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXImmediateSingleWriter.h>
34*cc02d7e2SAndroid Build Coastguard Worker#import <RxLibrary/GRXWriter+Immediate.h>
35*cc02d7e2SAndroid Build Coastguard Worker
36*cc02d7e2SAndroid Build Coastguard Worker#include <grpc/grpc.h>
37*cc02d7e2SAndroid Build Coastguard Worker
38*cc02d7e2SAndroid Build Coastguard Workerconst char *kCFStreamVarName = "grpc_cfstream";
39*cc02d7e2SAndroid Build Coastguard Workerstatic NSMutableDictionary *callFlags;
40*cc02d7e2SAndroid Build Coastguard Worker
41*cc02d7e2SAndroid Build Coastguard Worker// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
42*cc02d7e2SAndroid Build Coastguard Worker// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
43*cc02d7e2SAndroid Build Coastguard Worker// and RECV_STATUS_ON_CLIENT.
44*cc02d7e2SAndroid Build Coastguard WorkerNSInteger kMaxClientBatch = 6;
45*cc02d7e2SAndroid Build Coastguard Worker
46*cc02d7e2SAndroid Build Coastguard Workerstatic NSString *const kAuthorizationHeader = @"authorization";
47*cc02d7e2SAndroid Build Coastguard Workerstatic NSString *const kBearerPrefix = @"Bearer ";
48*cc02d7e2SAndroid Build Coastguard Worker
49*cc02d7e2SAndroid Build Coastguard Worker@interface GRPCCall () <GRXWriteable>
50*cc02d7e2SAndroid Build Coastguard Worker// Make them read-write.
51*cc02d7e2SAndroid Build Coastguard Worker@property(atomic, copy) NSDictionary *responseHeaders;
52*cc02d7e2SAndroid Build Coastguard Worker@property(atomic, copy) NSDictionary *responseTrailers;
53*cc02d7e2SAndroid Build Coastguard Worker
54*cc02d7e2SAndroid Build Coastguard Worker- (void)receiveNextMessages:(NSUInteger)numberOfMessages;
55*cc02d7e2SAndroid Build Coastguard Worker
56*cc02d7e2SAndroid Build Coastguard Worker@end
57*cc02d7e2SAndroid Build Coastguard Worker
58*cc02d7e2SAndroid Build Coastguard Worker// The following methods of a C gRPC call object aren't reentrant, and thus
59*cc02d7e2SAndroid Build Coastguard Worker// calls to them must be serialized:
60*cc02d7e2SAndroid Build Coastguard Worker// - start_batch
61*cc02d7e2SAndroid Build Coastguard Worker// - destroy
62*cc02d7e2SAndroid Build Coastguard Worker//
63*cc02d7e2SAndroid Build Coastguard Worker// start_batch with a SEND_MESSAGE argument can only be called after the
64*cc02d7e2SAndroid Build Coastguard Worker// OP_COMPLETE event for any previous write is received. This is achieved by
65*cc02d7e2SAndroid Build Coastguard Worker// pausing the requests writer immediately every time it writes a value, and
66*cc02d7e2SAndroid Build Coastguard Worker// resuming it again when OP_COMPLETE is received.
67*cc02d7e2SAndroid Build Coastguard Worker//
68*cc02d7e2SAndroid Build Coastguard Worker// Similarly, start_batch with a RECV_MESSAGE argument can only be called after
69*cc02d7e2SAndroid Build Coastguard Worker// the OP_COMPLETE event for any previous read is received.This is easier to
70*cc02d7e2SAndroid Build Coastguard Worker// enforce, as we're writing the received messages into the writeable:
71*cc02d7e2SAndroid Build Coastguard Worker// start_batch is enqueued once upon receiving the OP_COMPLETE event for the
72*cc02d7e2SAndroid Build Coastguard Worker// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
73*cc02d7e2SAndroid Build Coastguard Worker// each RECV_MESSAGE batch.
74*cc02d7e2SAndroid Build Coastguard Worker@implementation GRPCCall {
75*cc02d7e2SAndroid Build Coastguard Worker  dispatch_queue_t _callQueue;
76*cc02d7e2SAndroid Build Coastguard Worker
77*cc02d7e2SAndroid Build Coastguard Worker  NSString *_host;
78*cc02d7e2SAndroid Build Coastguard Worker  NSString *_path;
79*cc02d7e2SAndroid Build Coastguard Worker  GRPCCallSafety _callSafety;
80*cc02d7e2SAndroid Build Coastguard Worker  GRPCCallOptions *_callOptions;
81*cc02d7e2SAndroid Build Coastguard Worker  GRPCWrappedCall *_wrappedCall;
82*cc02d7e2SAndroid Build Coastguard Worker
83*cc02d7e2SAndroid Build Coastguard Worker  // The C gRPC library has less guarantees on the ordering of events than we
84*cc02d7e2SAndroid Build Coastguard Worker  // do. Particularly, in the face of errors, there's no ordering guarantee at
85*cc02d7e2SAndroid Build Coastguard Worker  // all. This wrapper over our actual writeable ensures thread-safety and
86*cc02d7e2SAndroid Build Coastguard Worker  // correct ordering.
87*cc02d7e2SAndroid Build Coastguard Worker  GRXConcurrentWriteable *_responseWriteable;
88*cc02d7e2SAndroid Build Coastguard Worker
89*cc02d7e2SAndroid Build Coastguard Worker  // The network thread wants the requestWriter to resume (when the server is ready for more input),
90*cc02d7e2SAndroid Build Coastguard Worker  // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
91*cc02d7e2SAndroid Build Coastguard Worker  // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
92*cc02d7e2SAndroid Build Coastguard Worker  // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
93*cc02d7e2SAndroid Build Coastguard Worker  // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
94*cc02d7e2SAndroid Build Coastguard Worker  // pause the writer immediately on writeValue:, so we need our locking to be recursive.
95*cc02d7e2SAndroid Build Coastguard Worker  GRXWriter *_requestWriter;
96*cc02d7e2SAndroid Build Coastguard Worker
97*cc02d7e2SAndroid Build Coastguard Worker  // To create a retain cycle when a call is started, up until it finishes. See
98*cc02d7e2SAndroid Build Coastguard Worker  // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
99*cc02d7e2SAndroid Build Coastguard Worker  // reference to the call object if all they're interested in is the handler being executed when
100*cc02d7e2SAndroid Build Coastguard Worker  // the response arrives.
101*cc02d7e2SAndroid Build Coastguard Worker  GRPCCall *_retainSelf;
102*cc02d7e2SAndroid Build Coastguard Worker
103*cc02d7e2SAndroid Build Coastguard Worker  GRPCRequestHeaders *_requestHeaders;
104*cc02d7e2SAndroid Build Coastguard Worker
105*cc02d7e2SAndroid Build Coastguard Worker  // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
106*cc02d7e2SAndroid Build Coastguard Worker  // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
107*cc02d7e2SAndroid Build Coastguard Worker  // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
108*cc02d7e2SAndroid Build Coastguard Worker  // the SendClose op is added.
109*cc02d7e2SAndroid Build Coastguard Worker  BOOL _unaryCall;
110*cc02d7e2SAndroid Build Coastguard Worker  NSMutableArray *_unaryOpBatch;
111*cc02d7e2SAndroid Build Coastguard Worker
112*cc02d7e2SAndroid Build Coastguard Worker  // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
113*cc02d7e2SAndroid Build Coastguard Worker  // queue
114*cc02d7e2SAndroid Build Coastguard Worker  dispatch_queue_t _responseQueue;
115*cc02d7e2SAndroid Build Coastguard Worker
116*cc02d7e2SAndroid Build Coastguard Worker  // The OAuth2 token fetched from a token provider.
117*cc02d7e2SAndroid Build Coastguard Worker  NSString *_fetchedOauth2AccessToken;
118*cc02d7e2SAndroid Build Coastguard Worker
119*cc02d7e2SAndroid Build Coastguard Worker  // The callback to be called when a write message op is done.
120*cc02d7e2SAndroid Build Coastguard Worker  void (^_writeDone)(void);
121*cc02d7e2SAndroid Build Coastguard Worker
122*cc02d7e2SAndroid Build Coastguard Worker  // Indicate a read request to core is pending.
123*cc02d7e2SAndroid Build Coastguard Worker  BOOL _pendingCoreRead;
124*cc02d7e2SAndroid Build Coastguard Worker
125*cc02d7e2SAndroid Build Coastguard Worker  // Indicate pending read message request from user.
126*cc02d7e2SAndroid Build Coastguard Worker  NSUInteger _pendingReceiveNextMessages;
127*cc02d7e2SAndroid Build Coastguard Worker}
128*cc02d7e2SAndroid Build Coastguard Worker
129*cc02d7e2SAndroid Build Coastguard Worker@synthesize state = _state;
130*cc02d7e2SAndroid Build Coastguard Worker
131*cc02d7e2SAndroid Build Coastguard Worker+ (void)initialize {
132*cc02d7e2SAndroid Build Coastguard Worker  // Guarantees the code in {} block is invoked only once. See ref at:
133*cc02d7e2SAndroid Build Coastguard Worker  // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
134*cc02d7e2SAndroid Build Coastguard Worker  if (self == [GRPCCall self]) {
135*cc02d7e2SAndroid Build Coastguard Worker    grpc_init();
136*cc02d7e2SAndroid Build Coastguard Worker    callFlags = [NSMutableDictionary dictionary];
137*cc02d7e2SAndroid Build Coastguard Worker  }
138*cc02d7e2SAndroid Build Coastguard Worker}
139*cc02d7e2SAndroid Build Coastguard Worker
140*cc02d7e2SAndroid Build Coastguard Worker+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
141*cc02d7e2SAndroid Build Coastguard Worker  if (host.length == 0 || path.length == 0) {
142*cc02d7e2SAndroid Build Coastguard Worker    return;
143*cc02d7e2SAndroid Build Coastguard Worker  }
144*cc02d7e2SAndroid Build Coastguard Worker  NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
145*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(callFlags) {
146*cc02d7e2SAndroid Build Coastguard Worker    switch (callSafety) {
147*cc02d7e2SAndroid Build Coastguard Worker      case GRPCCallSafetyDefault:
148*cc02d7e2SAndroid Build Coastguard Worker        callFlags[hostAndPath] = @0;
149*cc02d7e2SAndroid Build Coastguard Worker        break;
150*cc02d7e2SAndroid Build Coastguard Worker      default:
151*cc02d7e2SAndroid Build Coastguard Worker        break;
152*cc02d7e2SAndroid Build Coastguard Worker    }
153*cc02d7e2SAndroid Build Coastguard Worker  }
154*cc02d7e2SAndroid Build Coastguard Worker}
155*cc02d7e2SAndroid Build Coastguard Worker
156*cc02d7e2SAndroid Build Coastguard Worker+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
157*cc02d7e2SAndroid Build Coastguard Worker  NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
158*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(callFlags) {
159*cc02d7e2SAndroid Build Coastguard Worker    return [callFlags[hostAndPath] intValue];
160*cc02d7e2SAndroid Build Coastguard Worker  }
161*cc02d7e2SAndroid Build Coastguard Worker}
162*cc02d7e2SAndroid Build Coastguard Worker
163*cc02d7e2SAndroid Build Coastguard Worker- (instancetype)initWithHost:(NSString *)host
164*cc02d7e2SAndroid Build Coastguard Worker                        path:(NSString *)path
165*cc02d7e2SAndroid Build Coastguard Worker              requestsWriter:(GRXWriter *)requestWriter {
166*cc02d7e2SAndroid Build Coastguard Worker  return [self initWithHost:host
167*cc02d7e2SAndroid Build Coastguard Worker                       path:path
168*cc02d7e2SAndroid Build Coastguard Worker                 callSafety:GRPCCallSafetyDefault
169*cc02d7e2SAndroid Build Coastguard Worker             requestsWriter:requestWriter
170*cc02d7e2SAndroid Build Coastguard Worker                callOptions:nil
171*cc02d7e2SAndroid Build Coastguard Worker                  writeDone:nil];
172*cc02d7e2SAndroid Build Coastguard Worker}
173*cc02d7e2SAndroid Build Coastguard Worker
174*cc02d7e2SAndroid Build Coastguard Worker- (instancetype)initWithHost:(NSString *)host
175*cc02d7e2SAndroid Build Coastguard Worker                        path:(NSString *)path
176*cc02d7e2SAndroid Build Coastguard Worker                  callSafety:(GRPCCallSafety)safety
177*cc02d7e2SAndroid Build Coastguard Worker              requestsWriter:(GRXWriter *)requestsWriter
178*cc02d7e2SAndroid Build Coastguard Worker                 callOptions:(GRPCCallOptions *)callOptions
179*cc02d7e2SAndroid Build Coastguard Worker                   writeDone:(void (^)(void))writeDone {
180*cc02d7e2SAndroid Build Coastguard Worker  // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
181*cc02d7e2SAndroid Build Coastguard Worker  NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
182*cc02d7e2SAndroid Build Coastguard Worker  NSAssert(safety <= GRPCCallSafetyDefault, @"Invalid call safety value.");
183*cc02d7e2SAndroid Build Coastguard Worker  NSAssert(requestsWriter.state == GRXWriterStateNotStarted,
184*cc02d7e2SAndroid Build Coastguard Worker           @"The requests writer can't be already started.");
185*cc02d7e2SAndroid Build Coastguard Worker  if (!host || !path) {
186*cc02d7e2SAndroid Build Coastguard Worker    return nil;
187*cc02d7e2SAndroid Build Coastguard Worker  }
188*cc02d7e2SAndroid Build Coastguard Worker  if (requestsWriter.state != GRXWriterStateNotStarted) {
189*cc02d7e2SAndroid Build Coastguard Worker    return nil;
190*cc02d7e2SAndroid Build Coastguard Worker  }
191*cc02d7e2SAndroid Build Coastguard Worker
192*cc02d7e2SAndroid Build Coastguard Worker  if ((self = [super init])) {
193*cc02d7e2SAndroid Build Coastguard Worker    _host = [host copy];
194*cc02d7e2SAndroid Build Coastguard Worker    _path = [path copy];
195*cc02d7e2SAndroid Build Coastguard Worker    _callSafety = safety;
196*cc02d7e2SAndroid Build Coastguard Worker    _callOptions = [callOptions copy];
197*cc02d7e2SAndroid Build Coastguard Worker
198*cc02d7e2SAndroid Build Coastguard Worker    // Serial queue to invoke the non-reentrant methods of the grpc_call object.
199*cc02d7e2SAndroid Build Coastguard Worker    _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
200*cc02d7e2SAndroid Build Coastguard Worker
201*cc02d7e2SAndroid Build Coastguard Worker    _requestWriter = requestsWriter;
202*cc02d7e2SAndroid Build Coastguard Worker    _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
203*cc02d7e2SAndroid Build Coastguard Worker    _writeDone = writeDone;
204*cc02d7e2SAndroid Build Coastguard Worker
205*cc02d7e2SAndroid Build Coastguard Worker    if ([requestsWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
206*cc02d7e2SAndroid Build Coastguard Worker      _unaryCall = YES;
207*cc02d7e2SAndroid Build Coastguard Worker      _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
208*cc02d7e2SAndroid Build Coastguard Worker    }
209*cc02d7e2SAndroid Build Coastguard Worker
210*cc02d7e2SAndroid Build Coastguard Worker    _responseQueue = dispatch_get_main_queue();
211*cc02d7e2SAndroid Build Coastguard Worker
212*cc02d7e2SAndroid Build Coastguard Worker    // do not start a read until initial metadata is received
213*cc02d7e2SAndroid Build Coastguard Worker    _pendingReceiveNextMessages = 0;
214*cc02d7e2SAndroid Build Coastguard Worker    _pendingCoreRead = YES;
215*cc02d7e2SAndroid Build Coastguard Worker  }
216*cc02d7e2SAndroid Build Coastguard Worker  return self;
217*cc02d7e2SAndroid Build Coastguard Worker}
218*cc02d7e2SAndroid Build Coastguard Worker
219*cc02d7e2SAndroid Build Coastguard Worker- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
220*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
221*cc02d7e2SAndroid Build Coastguard Worker    if (_state != GRXWriterStateNotStarted) {
222*cc02d7e2SAndroid Build Coastguard Worker      return;
223*cc02d7e2SAndroid Build Coastguard Worker    }
224*cc02d7e2SAndroid Build Coastguard Worker    _responseQueue = queue;
225*cc02d7e2SAndroid Build Coastguard Worker  }
226*cc02d7e2SAndroid Build Coastguard Worker}
227*cc02d7e2SAndroid Build Coastguard Worker
228*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Finish
229*cc02d7e2SAndroid Build Coastguard Worker
230*cc02d7e2SAndroid Build Coastguard Worker// This function should support being called within a @synchronized(self) block in another function
231*cc02d7e2SAndroid Build Coastguard Worker// Should not manipulate _requestWriter for deadlock prevention.
232*cc02d7e2SAndroid Build Coastguard Worker- (void)finishWithError:(NSError *)errorOrNil {
233*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
234*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateFinished) {
235*cc02d7e2SAndroid Build Coastguard Worker      return;
236*cc02d7e2SAndroid Build Coastguard Worker    }
237*cc02d7e2SAndroid Build Coastguard Worker    _state = GRXWriterStateFinished;
238*cc02d7e2SAndroid Build Coastguard Worker
239*cc02d7e2SAndroid Build Coastguard Worker    if (errorOrNil) {
240*cc02d7e2SAndroid Build Coastguard Worker      [_responseWriteable cancelWithError:errorOrNil];
241*cc02d7e2SAndroid Build Coastguard Worker    } else {
242*cc02d7e2SAndroid Build Coastguard Worker      [_responseWriteable enqueueSuccessfulCompletion];
243*cc02d7e2SAndroid Build Coastguard Worker    }
244*cc02d7e2SAndroid Build Coastguard Worker
245*cc02d7e2SAndroid Build Coastguard Worker    // If the call isn't retained anywhere else, it can be deallocated now.
246*cc02d7e2SAndroid Build Coastguard Worker    _retainSelf = nil;
247*cc02d7e2SAndroid Build Coastguard Worker  }
248*cc02d7e2SAndroid Build Coastguard Worker}
249*cc02d7e2SAndroid Build Coastguard Worker
250*cc02d7e2SAndroid Build Coastguard Worker- (void)cancel {
251*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
252*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateFinished) {
253*cc02d7e2SAndroid Build Coastguard Worker      return;
254*cc02d7e2SAndroid Build Coastguard Worker    }
255*cc02d7e2SAndroid Build Coastguard Worker    [self finishWithError:[NSError
256*cc02d7e2SAndroid Build Coastguard Worker                              errorWithDomain:kGRPCErrorDomain
257*cc02d7e2SAndroid Build Coastguard Worker                                         code:GRPCErrorCodeCancelled
258*cc02d7e2SAndroid Build Coastguard Worker                                     userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
259*cc02d7e2SAndroid Build Coastguard Worker    [_wrappedCall cancel];
260*cc02d7e2SAndroid Build Coastguard Worker  }
261*cc02d7e2SAndroid Build Coastguard Worker  _requestWriter.state = GRXWriterStateFinished;
262*cc02d7e2SAndroid Build Coastguard Worker}
263*cc02d7e2SAndroid Build Coastguard Worker
264*cc02d7e2SAndroid Build Coastguard Worker- (void)dealloc {
265*cc02d7e2SAndroid Build Coastguard Worker  if (_callQueue) {
266*cc02d7e2SAndroid Build Coastguard Worker    __block GRPCWrappedCall *wrappedCall = _wrappedCall;
267*cc02d7e2SAndroid Build Coastguard Worker    dispatch_async(_callQueue, ^{
268*cc02d7e2SAndroid Build Coastguard Worker      wrappedCall = nil;
269*cc02d7e2SAndroid Build Coastguard Worker    });
270*cc02d7e2SAndroid Build Coastguard Worker  } else {
271*cc02d7e2SAndroid Build Coastguard Worker    _wrappedCall = nil;
272*cc02d7e2SAndroid Build Coastguard Worker  }
273*cc02d7e2SAndroid Build Coastguard Worker}
274*cc02d7e2SAndroid Build Coastguard Worker
275*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Read messages
276*cc02d7e2SAndroid Build Coastguard Worker
277*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue.
278*cc02d7e2SAndroid Build Coastguard Worker// The handler will be called from the network queue.
279*cc02d7e2SAndroid Build Coastguard Worker- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
280*cc02d7e2SAndroid Build Coastguard Worker  // TODO(jcanizales): Add error handlers for async failures
281*cc02d7e2SAndroid Build Coastguard Worker  [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
282*cc02d7e2SAndroid Build Coastguard Worker}
283*cc02d7e2SAndroid Build Coastguard Worker
284*cc02d7e2SAndroid Build Coastguard Worker// Called initially from the network queue once response headers are received,
285*cc02d7e2SAndroid Build Coastguard Worker// then "recursively" from the responseWriteable queue after each response from the
286*cc02d7e2SAndroid Build Coastguard Worker// server has been written.
287*cc02d7e2SAndroid Build Coastguard Worker// If the call is currently paused, this is a noop. Restarting the call will invoke this
288*cc02d7e2SAndroid Build Coastguard Worker// method.
289*cc02d7e2SAndroid Build Coastguard Worker// TODO(jcanizales): Rename to readResponseIfNotPaused.
290*cc02d7e2SAndroid Build Coastguard Worker- (void)maybeStartNextRead {
291*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
292*cc02d7e2SAndroid Build Coastguard Worker    if (_state != GRXWriterStateStarted) {
293*cc02d7e2SAndroid Build Coastguard Worker      return;
294*cc02d7e2SAndroid Build Coastguard Worker    }
295*cc02d7e2SAndroid Build Coastguard Worker    if (_callOptions.flowControlEnabled && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) {
296*cc02d7e2SAndroid Build Coastguard Worker      return;
297*cc02d7e2SAndroid Build Coastguard Worker    }
298*cc02d7e2SAndroid Build Coastguard Worker    _pendingCoreRead = YES;
299*cc02d7e2SAndroid Build Coastguard Worker    _pendingReceiveNextMessages--;
300*cc02d7e2SAndroid Build Coastguard Worker  }
301*cc02d7e2SAndroid Build Coastguard Worker
302*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_callQueue, ^{
303*cc02d7e2SAndroid Build Coastguard Worker    __weak GRPCCall *weakSelf = self;
304*cc02d7e2SAndroid Build Coastguard Worker    [self startReadWithHandler:^(grpc_byte_buffer *message) {
305*cc02d7e2SAndroid Build Coastguard Worker      if (message == NULL) {
306*cc02d7e2SAndroid Build Coastguard Worker        // No more messages from the server
307*cc02d7e2SAndroid Build Coastguard Worker        return;
308*cc02d7e2SAndroid Build Coastguard Worker      }
309*cc02d7e2SAndroid Build Coastguard Worker      __strong GRPCCall *strongSelf = weakSelf;
310*cc02d7e2SAndroid Build Coastguard Worker      if (strongSelf == nil) {
311*cc02d7e2SAndroid Build Coastguard Worker        grpc_byte_buffer_destroy(message);
312*cc02d7e2SAndroid Build Coastguard Worker        return;
313*cc02d7e2SAndroid Build Coastguard Worker      }
314*cc02d7e2SAndroid Build Coastguard Worker      NSData *data = [NSData grpc_dataWithByteBuffer:message];
315*cc02d7e2SAndroid Build Coastguard Worker      grpc_byte_buffer_destroy(message);
316*cc02d7e2SAndroid Build Coastguard Worker      if (!data) {
317*cc02d7e2SAndroid Build Coastguard Worker        // The app doesn't have enough memory to hold the server response. We
318*cc02d7e2SAndroid Build Coastguard Worker        // don't want to throw, because the app shouldn't crash for a behavior
319*cc02d7e2SAndroid Build Coastguard Worker        // that's on the hands of any server to have. Instead we finish and ask
320*cc02d7e2SAndroid Build Coastguard Worker        // the server to cancel.
321*cc02d7e2SAndroid Build Coastguard Worker        @synchronized(strongSelf) {
322*cc02d7e2SAndroid Build Coastguard Worker          strongSelf->_pendingCoreRead = NO;
323*cc02d7e2SAndroid Build Coastguard Worker          [strongSelf
324*cc02d7e2SAndroid Build Coastguard Worker              finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
325*cc02d7e2SAndroid Build Coastguard Worker                                                  code:GRPCErrorCodeResourceExhausted
326*cc02d7e2SAndroid Build Coastguard Worker                                              userInfo:@{
327*cc02d7e2SAndroid Build Coastguard Worker                                                NSLocalizedDescriptionKey :
328*cc02d7e2SAndroid Build Coastguard Worker                                                    @"Client does not have enough memory to "
329*cc02d7e2SAndroid Build Coastguard Worker                                                    @"hold the server response."
330*cc02d7e2SAndroid Build Coastguard Worker                                              }]];
331*cc02d7e2SAndroid Build Coastguard Worker          [strongSelf->_wrappedCall cancel];
332*cc02d7e2SAndroid Build Coastguard Worker        }
333*cc02d7e2SAndroid Build Coastguard Worker        strongSelf->_requestWriter.state = GRXWriterStateFinished;
334*cc02d7e2SAndroid Build Coastguard Worker      } else {
335*cc02d7e2SAndroid Build Coastguard Worker        @synchronized(strongSelf) {
336*cc02d7e2SAndroid Build Coastguard Worker          [strongSelf->_responseWriteable enqueueValue:data
337*cc02d7e2SAndroid Build Coastguard Worker                                     completionHandler:^{
338*cc02d7e2SAndroid Build Coastguard Worker                                       __strong GRPCCall *strongSelf = weakSelf;
339*cc02d7e2SAndroid Build Coastguard Worker                                       if (strongSelf) {
340*cc02d7e2SAndroid Build Coastguard Worker                                         @synchronized(strongSelf) {
341*cc02d7e2SAndroid Build Coastguard Worker                                           strongSelf->_pendingCoreRead = NO;
342*cc02d7e2SAndroid Build Coastguard Worker                                           [strongSelf maybeStartNextRead];
343*cc02d7e2SAndroid Build Coastguard Worker                                         }
344*cc02d7e2SAndroid Build Coastguard Worker                                       }
345*cc02d7e2SAndroid Build Coastguard Worker                                     }];
346*cc02d7e2SAndroid Build Coastguard Worker        }
347*cc02d7e2SAndroid Build Coastguard Worker      }
348*cc02d7e2SAndroid Build Coastguard Worker    }];
349*cc02d7e2SAndroid Build Coastguard Worker  });
350*cc02d7e2SAndroid Build Coastguard Worker}
351*cc02d7e2SAndroid Build Coastguard Worker
352*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Send headers
353*cc02d7e2SAndroid Build Coastguard Worker
354*cc02d7e2SAndroid Build Coastguard Worker- (void)sendHeaders {
355*cc02d7e2SAndroid Build Coastguard Worker  // TODO (mxyan): Remove after deprecated methods are removed
356*cc02d7e2SAndroid Build Coastguard Worker  uint32_t callSafetyFlags = 0;
357*cc02d7e2SAndroid Build Coastguard Worker
358*cc02d7e2SAndroid Build Coastguard Worker  NSMutableDictionary *headers = [_requestHeaders mutableCopy];
359*cc02d7e2SAndroid Build Coastguard Worker  NSString *fetchedOauth2AccessToken;
360*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
361*cc02d7e2SAndroid Build Coastguard Worker    fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
362*cc02d7e2SAndroid Build Coastguard Worker  }
363*cc02d7e2SAndroid Build Coastguard Worker  if (fetchedOauth2AccessToken != nil) {
364*cc02d7e2SAndroid Build Coastguard Worker    headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
365*cc02d7e2SAndroid Build Coastguard Worker  } else if (_callOptions.oauth2AccessToken != nil) {
366*cc02d7e2SAndroid Build Coastguard Worker    headers[@"authorization"] =
367*cc02d7e2SAndroid Build Coastguard Worker        [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
368*cc02d7e2SAndroid Build Coastguard Worker  }
369*cc02d7e2SAndroid Build Coastguard Worker
370*cc02d7e2SAndroid Build Coastguard Worker  // TODO(jcanizales): Add error handlers for async failures
371*cc02d7e2SAndroid Build Coastguard Worker  GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
372*cc02d7e2SAndroid Build Coastguard Worker      initWithMetadata:headers
373*cc02d7e2SAndroid Build Coastguard Worker                 flags:callSafetyFlags
374*cc02d7e2SAndroid Build Coastguard Worker               handler:nil];  // No clean-up needed after SEND_INITIAL_METADATA
375*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_callQueue, ^{
376*cc02d7e2SAndroid Build Coastguard Worker    if (!self->_unaryCall) {
377*cc02d7e2SAndroid Build Coastguard Worker      [self->_wrappedCall startBatchWithOperations:@[ op ]];
378*cc02d7e2SAndroid Build Coastguard Worker    } else {
379*cc02d7e2SAndroid Build Coastguard Worker      [self->_unaryOpBatch addObject:op];
380*cc02d7e2SAndroid Build Coastguard Worker    }
381*cc02d7e2SAndroid Build Coastguard Worker  });
382*cc02d7e2SAndroid Build Coastguard Worker}
383*cc02d7e2SAndroid Build Coastguard Worker
384*cc02d7e2SAndroid Build Coastguard Worker- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
385*cc02d7e2SAndroid Build Coastguard Worker  if (numberOfMessages == 0) {
386*cc02d7e2SAndroid Build Coastguard Worker    return;
387*cc02d7e2SAndroid Build Coastguard Worker  }
388*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
389*cc02d7e2SAndroid Build Coastguard Worker    _pendingReceiveNextMessages += numberOfMessages;
390*cc02d7e2SAndroid Build Coastguard Worker
391*cc02d7e2SAndroid Build Coastguard Worker    if (_state != GRXWriterStateStarted || !_callOptions.flowControlEnabled) {
392*cc02d7e2SAndroid Build Coastguard Worker      return;
393*cc02d7e2SAndroid Build Coastguard Worker    }
394*cc02d7e2SAndroid Build Coastguard Worker    [self maybeStartNextRead];
395*cc02d7e2SAndroid Build Coastguard Worker  }
396*cc02d7e2SAndroid Build Coastguard Worker}
397*cc02d7e2SAndroid Build Coastguard Worker
398*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriteable implementation
399*cc02d7e2SAndroid Build Coastguard Worker
400*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue. The error handler will be called from the
401*cc02d7e2SAndroid Build Coastguard Worker// network queue if the write didn't succeed.
402*cc02d7e2SAndroid Build Coastguard Worker// If the call is a unary call, parameter \a errorHandler will be ignored and
403*cc02d7e2SAndroid Build Coastguard Worker// the error handler of GRPCOpSendClose will be executed in case of error.
404*cc02d7e2SAndroid Build Coastguard Worker- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
405*cc02d7e2SAndroid Build Coastguard Worker  __weak GRPCCall *weakSelf = self;
406*cc02d7e2SAndroid Build Coastguard Worker  void (^resumingHandler)(void) = ^{
407*cc02d7e2SAndroid Build Coastguard Worker    // Resume the request writer.
408*cc02d7e2SAndroid Build Coastguard Worker    GRPCCall *strongSelf = weakSelf;
409*cc02d7e2SAndroid Build Coastguard Worker    if (strongSelf) {
410*cc02d7e2SAndroid Build Coastguard Worker      strongSelf->_requestWriter.state = GRXWriterStateStarted;
411*cc02d7e2SAndroid Build Coastguard Worker      if (strongSelf->_writeDone) {
412*cc02d7e2SAndroid Build Coastguard Worker        strongSelf->_writeDone();
413*cc02d7e2SAndroid Build Coastguard Worker      }
414*cc02d7e2SAndroid Build Coastguard Worker    }
415*cc02d7e2SAndroid Build Coastguard Worker  };
416*cc02d7e2SAndroid Build Coastguard Worker  GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message
417*cc02d7e2SAndroid Build Coastguard Worker                                                             handler:resumingHandler];
418*cc02d7e2SAndroid Build Coastguard Worker  if (!_unaryCall) {
419*cc02d7e2SAndroid Build Coastguard Worker    [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
420*cc02d7e2SAndroid Build Coastguard Worker  } else {
421*cc02d7e2SAndroid Build Coastguard Worker    // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
422*cc02d7e2SAndroid Build Coastguard Worker    // TODO (mxyan): unify the error handlers of all Ops into a single closure.
423*cc02d7e2SAndroid Build Coastguard Worker    [_unaryOpBatch addObject:op];
424*cc02d7e2SAndroid Build Coastguard Worker  }
425*cc02d7e2SAndroid Build Coastguard Worker}
426*cc02d7e2SAndroid Build Coastguard Worker
427*cc02d7e2SAndroid Build Coastguard Worker- (void)writeValue:(id)value {
428*cc02d7e2SAndroid Build Coastguard Worker  NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
429*cc02d7e2SAndroid Build Coastguard Worker
430*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
431*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateFinished) {
432*cc02d7e2SAndroid Build Coastguard Worker      return;
433*cc02d7e2SAndroid Build Coastguard Worker    }
434*cc02d7e2SAndroid Build Coastguard Worker  }
435*cc02d7e2SAndroid Build Coastguard Worker
436*cc02d7e2SAndroid Build Coastguard Worker  // Pause the input and only resume it when the C layer notifies us that writes
437*cc02d7e2SAndroid Build Coastguard Worker  // can proceed.
438*cc02d7e2SAndroid Build Coastguard Worker  _requestWriter.state = GRXWriterStatePaused;
439*cc02d7e2SAndroid Build Coastguard Worker
440*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_callQueue, ^{
441*cc02d7e2SAndroid Build Coastguard Worker    // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
442*cc02d7e2SAndroid Build Coastguard Worker    [self writeMessage:value withErrorHandler:nil];
443*cc02d7e2SAndroid Build Coastguard Worker  });
444*cc02d7e2SAndroid Build Coastguard Worker}
445*cc02d7e2SAndroid Build Coastguard Worker
446*cc02d7e2SAndroid Build Coastguard Worker// Only called from the call queue. The error handler will be called from the
447*cc02d7e2SAndroid Build Coastguard Worker// network queue if the requests stream couldn't be closed successfully.
448*cc02d7e2SAndroid Build Coastguard Worker- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
449*cc02d7e2SAndroid Build Coastguard Worker  if (!_unaryCall) {
450*cc02d7e2SAndroid Build Coastguard Worker    [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
451*cc02d7e2SAndroid Build Coastguard Worker                              errorHandler:errorHandler];
452*cc02d7e2SAndroid Build Coastguard Worker  } else {
453*cc02d7e2SAndroid Build Coastguard Worker    [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
454*cc02d7e2SAndroid Build Coastguard Worker    [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
455*cc02d7e2SAndroid Build Coastguard Worker  }
456*cc02d7e2SAndroid Build Coastguard Worker}
457*cc02d7e2SAndroid Build Coastguard Worker
458*cc02d7e2SAndroid Build Coastguard Worker- (void)writesFinishedWithError:(NSError *)errorOrNil {
459*cc02d7e2SAndroid Build Coastguard Worker  if (errorOrNil) {
460*cc02d7e2SAndroid Build Coastguard Worker    [self cancel];
461*cc02d7e2SAndroid Build Coastguard Worker  } else {
462*cc02d7e2SAndroid Build Coastguard Worker    dispatch_async(_callQueue, ^{
463*cc02d7e2SAndroid Build Coastguard Worker      // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
464*cc02d7e2SAndroid Build Coastguard Worker      [self finishRequestWithErrorHandler:nil];
465*cc02d7e2SAndroid Build Coastguard Worker    });
466*cc02d7e2SAndroid Build Coastguard Worker  }
467*cc02d7e2SAndroid Build Coastguard Worker}
468*cc02d7e2SAndroid Build Coastguard Worker
469*cc02d7e2SAndroid Build Coastguard Worker#pragma mark Invoke
470*cc02d7e2SAndroid Build Coastguard Worker
471*cc02d7e2SAndroid Build Coastguard Worker// Both handlers will eventually be called, from the network queue. Writes can start immediately
472*cc02d7e2SAndroid Build Coastguard Worker// after this.
473*cc02d7e2SAndroid Build Coastguard Worker// The first one (headersHandler), when the response headers are received.
474*cc02d7e2SAndroid Build Coastguard Worker// The second one (completionHandler), whenever the RPC finishes for any reason.
475*cc02d7e2SAndroid Build Coastguard Worker- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
476*cc02d7e2SAndroid Build Coastguard Worker                   completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
477*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_callQueue, ^{
478*cc02d7e2SAndroid Build Coastguard Worker    // TODO(jcanizales): Add error handlers for async failures
479*cc02d7e2SAndroid Build Coastguard Worker    [self->_wrappedCall
480*cc02d7e2SAndroid Build Coastguard Worker        startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
481*cc02d7e2SAndroid Build Coastguard Worker    [self->_wrappedCall
482*cc02d7e2SAndroid Build Coastguard Worker        startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
483*cc02d7e2SAndroid Build Coastguard Worker  });
484*cc02d7e2SAndroid Build Coastguard Worker}
485*cc02d7e2SAndroid Build Coastguard Worker
486*cc02d7e2SAndroid Build Coastguard Worker- (void)invokeCall {
487*cc02d7e2SAndroid Build Coastguard Worker  __weak GRPCCall *weakSelf = self;
488*cc02d7e2SAndroid Build Coastguard Worker  [self
489*cc02d7e2SAndroid Build Coastguard Worker      invokeCallWithHeadersHandler:^(NSDictionary *headers) {
490*cc02d7e2SAndroid Build Coastguard Worker        // Response headers received.
491*cc02d7e2SAndroid Build Coastguard Worker        __strong GRPCCall *strongSelf = weakSelf;
492*cc02d7e2SAndroid Build Coastguard Worker        if (strongSelf) {
493*cc02d7e2SAndroid Build Coastguard Worker          @synchronized(strongSelf) {
494*cc02d7e2SAndroid Build Coastguard Worker            // it is ok to set nil because headers are only received once
495*cc02d7e2SAndroid Build Coastguard Worker            strongSelf.responseHeaders = nil;
496*cc02d7e2SAndroid Build Coastguard Worker            // copy the header so that the GRPCOpRecvMetadata object may be dealloc'ed
497*cc02d7e2SAndroid Build Coastguard Worker            NSDictionary *copiedHeaders = [[NSDictionary alloc] initWithDictionary:headers
498*cc02d7e2SAndroid Build Coastguard Worker                                                                         copyItems:YES];
499*cc02d7e2SAndroid Build Coastguard Worker            strongSelf.responseHeaders = copiedHeaders;
500*cc02d7e2SAndroid Build Coastguard Worker            strongSelf->_pendingCoreRead = NO;
501*cc02d7e2SAndroid Build Coastguard Worker            [strongSelf maybeStartNextRead];
502*cc02d7e2SAndroid Build Coastguard Worker          }
503*cc02d7e2SAndroid Build Coastguard Worker        }
504*cc02d7e2SAndroid Build Coastguard Worker      }
505*cc02d7e2SAndroid Build Coastguard Worker      completionHandler:^(NSError *error, NSDictionary *trailers) {
506*cc02d7e2SAndroid Build Coastguard Worker        __strong GRPCCall *strongSelf = weakSelf;
507*cc02d7e2SAndroid Build Coastguard Worker        if (strongSelf) {
508*cc02d7e2SAndroid Build Coastguard Worker          strongSelf.responseTrailers = trailers;
509*cc02d7e2SAndroid Build Coastguard Worker
510*cc02d7e2SAndroid Build Coastguard Worker          if (error) {
511*cc02d7e2SAndroid Build Coastguard Worker            NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
512*cc02d7e2SAndroid Build Coastguard Worker            if (error.userInfo) {
513*cc02d7e2SAndroid Build Coastguard Worker              [userInfo addEntriesFromDictionary:error.userInfo];
514*cc02d7e2SAndroid Build Coastguard Worker            }
515*cc02d7e2SAndroid Build Coastguard Worker            userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
516*cc02d7e2SAndroid Build Coastguard Worker            // Since gRPC core does not guarantee the headers block being called before this block,
517*cc02d7e2SAndroid Build Coastguard Worker            // responseHeaders might be nil.
518*cc02d7e2SAndroid Build Coastguard Worker            userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
519*cc02d7e2SAndroid Build Coastguard Worker            error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
520*cc02d7e2SAndroid Build Coastguard Worker          }
521*cc02d7e2SAndroid Build Coastguard Worker          [strongSelf finishWithError:error];
522*cc02d7e2SAndroid Build Coastguard Worker          strongSelf->_requestWriter.state = GRXWriterStateFinished;
523*cc02d7e2SAndroid Build Coastguard Worker        }
524*cc02d7e2SAndroid Build Coastguard Worker      }];
525*cc02d7e2SAndroid Build Coastguard Worker}
526*cc02d7e2SAndroid Build Coastguard Worker
527*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriter implementation
528*cc02d7e2SAndroid Build Coastguard Worker
529*cc02d7e2SAndroid Build Coastguard Worker// Lock acquired inside startWithWriteable:
530*cc02d7e2SAndroid Build Coastguard Worker- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
531*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
532*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateFinished) {
533*cc02d7e2SAndroid Build Coastguard Worker      return;
534*cc02d7e2SAndroid Build Coastguard Worker    }
535*cc02d7e2SAndroid Build Coastguard Worker
536*cc02d7e2SAndroid Build Coastguard Worker    _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
537*cc02d7e2SAndroid Build Coastguard Worker                                                             dispatchQueue:_responseQueue];
538*cc02d7e2SAndroid Build Coastguard Worker
539*cc02d7e2SAndroid Build Coastguard Worker    GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host
540*cc02d7e2SAndroid Build Coastguard Worker                                                                       callOptions:_callOptions];
541*cc02d7e2SAndroid Build Coastguard Worker    _wrappedCall = [channel wrappedCallWithPath:_path
542*cc02d7e2SAndroid Build Coastguard Worker                                completionQueue:[GRPCCompletionQueue completionQueue]
543*cc02d7e2SAndroid Build Coastguard Worker                                    callOptions:_callOptions];
544*cc02d7e2SAndroid Build Coastguard Worker
545*cc02d7e2SAndroid Build Coastguard Worker    if (_wrappedCall == nil) {
546*cc02d7e2SAndroid Build Coastguard Worker      [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
547*cc02d7e2SAndroid Build Coastguard Worker                                                code:GRPCErrorCodeUnavailable
548*cc02d7e2SAndroid Build Coastguard Worker                                            userInfo:@{
549*cc02d7e2SAndroid Build Coastguard Worker                                              NSLocalizedDescriptionKey :
550*cc02d7e2SAndroid Build Coastguard Worker                                                  @"Failed to create call or channel."
551*cc02d7e2SAndroid Build Coastguard Worker                                            }]];
552*cc02d7e2SAndroid Build Coastguard Worker      return;
553*cc02d7e2SAndroid Build Coastguard Worker    }
554*cc02d7e2SAndroid Build Coastguard Worker
555*cc02d7e2SAndroid Build Coastguard Worker    [self sendHeaders];
556*cc02d7e2SAndroid Build Coastguard Worker    [self invokeCall];
557*cc02d7e2SAndroid Build Coastguard Worker  }
558*cc02d7e2SAndroid Build Coastguard Worker
559*cc02d7e2SAndroid Build Coastguard Worker  // Now that the RPC has been initiated, request writes can start.
560*cc02d7e2SAndroid Build Coastguard Worker  [_requestWriter startWithWriteable:self];
561*cc02d7e2SAndroid Build Coastguard Worker}
562*cc02d7e2SAndroid Build Coastguard Worker
563*cc02d7e2SAndroid Build Coastguard Worker- (void)startWithWriteable:(id<GRXWriteable>)writeable {
564*cc02d7e2SAndroid Build Coastguard Worker  id<GRPCAuthorizationProtocol> tokenProvider = nil;
565*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
566*cc02d7e2SAndroid Build Coastguard Worker    _state = GRXWriterStateStarted;
567*cc02d7e2SAndroid Build Coastguard Worker
568*cc02d7e2SAndroid Build Coastguard Worker    // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
569*cc02d7e2SAndroid Build Coastguard Worker    // This makes RPCs in which the call isn't externally retained possible (as long as it is
570*cc02d7e2SAndroid Build Coastguard Worker    // started before being autoreleased). Care is taken not to retain self strongly in any of the
571*cc02d7e2SAndroid Build Coastguard Worker    // blocks used in this implementation, so that the life of the instance is determined by this
572*cc02d7e2SAndroid Build Coastguard Worker    // retain cycle.
573*cc02d7e2SAndroid Build Coastguard Worker    _retainSelf = self;
574*cc02d7e2SAndroid Build Coastguard Worker
575*cc02d7e2SAndroid Build Coastguard Worker    // If _callOptions is nil, people must be using the deprecated v1 interface. In this case,
576*cc02d7e2SAndroid Build Coastguard Worker    // generate the call options from the corresponding GRPCHost configs and apply other options
577*cc02d7e2SAndroid Build Coastguard Worker    // that are not covered by GRPCHost.
578*cc02d7e2SAndroid Build Coastguard Worker    if (_callOptions == nil) {
579*cc02d7e2SAndroid Build Coastguard Worker      GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
580*cc02d7e2SAndroid Build Coastguard Worker      if (_serverName.length != 0) {
581*cc02d7e2SAndroid Build Coastguard Worker        callOptions.serverAuthority = _serverName;
582*cc02d7e2SAndroid Build Coastguard Worker      }
583*cc02d7e2SAndroid Build Coastguard Worker      if (_timeout > 0) {
584*cc02d7e2SAndroid Build Coastguard Worker        callOptions.timeout = _timeout;
585*cc02d7e2SAndroid Build Coastguard Worker      }
586*cc02d7e2SAndroid Build Coastguard Worker
587*cc02d7e2SAndroid Build Coastguard Worker      id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
588*cc02d7e2SAndroid Build Coastguard Worker      if (tokenProvider != nil) {
589*cc02d7e2SAndroid Build Coastguard Worker        callOptions.authTokenProvider = tokenProvider;
590*cc02d7e2SAndroid Build Coastguard Worker      }
591*cc02d7e2SAndroid Build Coastguard Worker      _callOptions = callOptions;
592*cc02d7e2SAndroid Build Coastguard Worker    }
593*cc02d7e2SAndroid Build Coastguard Worker
594*cc02d7e2SAndroid Build Coastguard Worker    NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
595*cc02d7e2SAndroid Build Coastguard Worker             @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
596*cc02d7e2SAndroid Build Coastguard Worker
597*cc02d7e2SAndroid Build Coastguard Worker    tokenProvider = _callOptions.authTokenProvider;
598*cc02d7e2SAndroid Build Coastguard Worker  }
599*cc02d7e2SAndroid Build Coastguard Worker
600*cc02d7e2SAndroid Build Coastguard Worker  if (tokenProvider != nil) {
601*cc02d7e2SAndroid Build Coastguard Worker    __weak auto weakSelf = self;
602*cc02d7e2SAndroid Build Coastguard Worker    [tokenProvider getTokenWithHandler:^(NSString *token) {
603*cc02d7e2SAndroid Build Coastguard Worker      __strong auto strongSelf = weakSelf;
604*cc02d7e2SAndroid Build Coastguard Worker      if (strongSelf) {
605*cc02d7e2SAndroid Build Coastguard Worker        BOOL startCall = NO;
606*cc02d7e2SAndroid Build Coastguard Worker        @synchronized(strongSelf) {
607*cc02d7e2SAndroid Build Coastguard Worker          if (strongSelf->_state != GRXWriterStateFinished) {
608*cc02d7e2SAndroid Build Coastguard Worker            startCall = YES;
609*cc02d7e2SAndroid Build Coastguard Worker            if (token) {
610*cc02d7e2SAndroid Build Coastguard Worker              strongSelf->_fetchedOauth2AccessToken = [token copy];
611*cc02d7e2SAndroid Build Coastguard Worker            }
612*cc02d7e2SAndroid Build Coastguard Worker          }
613*cc02d7e2SAndroid Build Coastguard Worker        }
614*cc02d7e2SAndroid Build Coastguard Worker        if (startCall) {
615*cc02d7e2SAndroid Build Coastguard Worker          [strongSelf startCallWithWriteable:writeable];
616*cc02d7e2SAndroid Build Coastguard Worker        }
617*cc02d7e2SAndroid Build Coastguard Worker      }
618*cc02d7e2SAndroid Build Coastguard Worker    }];
619*cc02d7e2SAndroid Build Coastguard Worker  } else {
620*cc02d7e2SAndroid Build Coastguard Worker    [self startCallWithWriteable:writeable];
621*cc02d7e2SAndroid Build Coastguard Worker  }
622*cc02d7e2SAndroid Build Coastguard Worker}
623*cc02d7e2SAndroid Build Coastguard Worker
624*cc02d7e2SAndroid Build Coastguard Worker- (void)setState:(GRXWriterState)newState {
625*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
626*cc02d7e2SAndroid Build Coastguard Worker    // Manual transitions are only allowed from the started or paused states.
627*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
628*cc02d7e2SAndroid Build Coastguard Worker      return;
629*cc02d7e2SAndroid Build Coastguard Worker    }
630*cc02d7e2SAndroid Build Coastguard Worker
631*cc02d7e2SAndroid Build Coastguard Worker    switch (newState) {
632*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateFinished:
633*cc02d7e2SAndroid Build Coastguard Worker        _state = newState;
634*cc02d7e2SAndroid Build Coastguard Worker        // Per GRXWriter's contract, setting the state to Finished manually
635*cc02d7e2SAndroid Build Coastguard Worker        // means one doesn't wish the writeable to be messaged anymore.
636*cc02d7e2SAndroid Build Coastguard Worker        [_responseWriteable cancelSilently];
637*cc02d7e2SAndroid Build Coastguard Worker        _responseWriteable = nil;
638*cc02d7e2SAndroid Build Coastguard Worker        return;
639*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStatePaused:
640*cc02d7e2SAndroid Build Coastguard Worker        _state = newState;
641*cc02d7e2SAndroid Build Coastguard Worker        return;
642*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateStarted:
643*cc02d7e2SAndroid Build Coastguard Worker        if (_state == GRXWriterStatePaused) {
644*cc02d7e2SAndroid Build Coastguard Worker          _state = newState;
645*cc02d7e2SAndroid Build Coastguard Worker          [self maybeStartNextRead];
646*cc02d7e2SAndroid Build Coastguard Worker        }
647*cc02d7e2SAndroid Build Coastguard Worker        return;
648*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateNotStarted:
649*cc02d7e2SAndroid Build Coastguard Worker        return;
650*cc02d7e2SAndroid Build Coastguard Worker    }
651*cc02d7e2SAndroid Build Coastguard Worker  }
652*cc02d7e2SAndroid Build Coastguard Worker}
653*cc02d7e2SAndroid Build Coastguard Worker
654*cc02d7e2SAndroid Build Coastguard Worker@end
655