xref: /aosp_15_r20/external/grpc-grpc/src/objective-c/tests/InteropTests/InteropTests.m (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "InteropTests.h"
20
21#include <grpc/status.h>
22
23#import <GRPCClient/GRPCCall+ChannelArg.h>
24#import <GRPCClient/GRPCCall+Cronet.h>
25#import <GRPCClient/GRPCCall+Interceptor.h>
26#import <GRPCClient/GRPCCall+Tests.h>
27#import <GRPCClient/GRPCInterceptor.h>
28#import <GRPCClient/internal_testing/GRPCCall+InternalTests.h>
29#import <ProtoRPC/ProtoRPC.h>
30#import <RxLibrary/GRXBufferedPipe.h>
31#import <RxLibrary/GRXWriter+Immediate.h>
32#import <grpc/grpc.h>
33#import <grpc/support/log.h>
34#import "src/objective-c/tests/RemoteTestClient/Messages.pbobjc.h"
35#import "src/objective-c/tests/RemoteTestClient/Test.pbobjc.h"
36#import "src/objective-c/tests/RemoteTestClient/Test.pbrpc.h"
37
38#import "../Common/TestUtils.h"
39#import "InteropTestsBlockCallbacks.h"
40
41#define SMALL_PAYLOAD_SIZE 10
42#define LARGE_REQUEST_PAYLOAD_SIZE 271828
43#define LARGE_RESPONSE_PAYLOAD_SIZE 314159
44
45static const int kTestRetries = 3;
46extern const char *kCFStreamVarName;
47
48@interface RMTStreamingOutputCallRequest (Constructors)
49+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
50                 requestedResponseSize:(NSNumber *)responseSize;
51@end
52
53@implementation RMTStreamingOutputCallRequest (Constructors)
54+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
55                 requestedResponseSize:(NSNumber *)responseSize {
56  RMTStreamingOutputCallRequest *request = [self message];
57  RMTResponseParameters *parameters = [RMTResponseParameters message];
58  parameters.size = responseSize.intValue;
59  [request.responseParametersArray addObject:parameters];
60  request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
61  return request;
62}
63@end
64
65@interface RMTStreamingOutputCallResponse (Constructors)
66+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
67@end
68
69@implementation RMTStreamingOutputCallResponse (Constructors)
70+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
71  RMTStreamingOutputCallResponse *response = [self message];
72  response.payload.type = RMTPayloadType_Compressable;
73  response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
74  return response;
75}
76@end
77
78BOOL isRemoteInteropTest(NSString *host) {
79  return [host isEqualToString:@"grpc-test.sandbox.googleapis.com"];
80}
81
82@interface DefaultInterceptorFactory : NSObject <GRPCInterceptorFactory>
83
84- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
85
86@end
87
88@implementation DefaultInterceptorFactory
89
90- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
91  dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
92  return [[GRPCInterceptor alloc] initWithInterceptorManager:interceptorManager
93                                               dispatchQueue:queue];
94}
95
96@end
97
98@interface HookInterceptorFactory : NSObject <GRPCInterceptorFactory>
99
100- (instancetype)
101      initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
102                  startHook:(void (^)(GRPCRequestOptions *requestOptions,
103                                      GRPCCallOptions *callOptions,
104                                      GRPCInterceptorManager *manager))startHook
105              writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
106                 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
107    receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
108                                      GRPCInterceptorManager *manager))receiveNextMessagesHook
109         responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
110                                      GRPCInterceptorManager *manager))responseHeaderHook
111           responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
112          responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
113                                      GRPCInterceptorManager *manager))responseCloseHook
114           didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
115
116- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
117
118@end
119
120@interface HookInterceptor : GRPCInterceptor
121
122- (instancetype)
123    initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
124                 dispatchQueue:(dispatch_queue_t)dispatchQueue
125                     startHook:(void (^)(GRPCRequestOptions *requestOptions,
126                                         GRPCCallOptions *callOptions,
127                                         GRPCInterceptorManager *manager))startHook
128                 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
129                    finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
130       receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
131                                         GRPCInterceptorManager *manager))receiveNextMessagesHook
132            responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
133                                         GRPCInterceptorManager *manager))responseHeaderHook
134              responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
135             responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
136                                         GRPCInterceptorManager *manager))responseCloseHook
137              didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
138
139@end
140
141@implementation HookInterceptorFactory {
142 @protected
143  void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
144                     GRPCInterceptorManager *manager);
145  void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
146  void (^_finishHook)(GRPCInterceptorManager *manager);
147  void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
148  void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
149  void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
150  void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
151                             GRPCInterceptorManager *manager);
152  void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
153  dispatch_queue_t _dispatchQueue;
154}
155
156- (instancetype)
157      initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
158                  startHook:(void (^)(GRPCRequestOptions *requestOptions,
159                                      GRPCCallOptions *callOptions,
160                                      GRPCInterceptorManager *manager))startHook
161              writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
162                 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
163    receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
164                                      GRPCInterceptorManager *manager))receiveNextMessagesHook
165         responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
166                                      GRPCInterceptorManager *manager))responseHeaderHook
167           responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
168          responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
169                                      GRPCInterceptorManager *manager))responseCloseHook
170           didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
171  if ((self = [super init])) {
172    _dispatchQueue = dispatchQueue;
173    _startHook = startHook;
174    _writeDataHook = writeDataHook;
175    _finishHook = finishHook;
176    _receiveNextMessagesHook = receiveNextMessagesHook;
177    _responseHeaderHook = responseHeaderHook;
178    _responseDataHook = responseDataHook;
179    _responseCloseHook = responseCloseHook;
180    _didWriteDataHook = didWriteDataHook;
181  }
182  return self;
183}
184
185- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
186  return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
187                                               dispatchQueue:_dispatchQueue
188                                                   startHook:_startHook
189                                               writeDataHook:_writeDataHook
190                                                  finishHook:_finishHook
191                                     receiveNextMessagesHook:_receiveNextMessagesHook
192                                          responseHeaderHook:_responseHeaderHook
193                                            responseDataHook:_responseDataHook
194                                           responseCloseHook:_responseCloseHook
195                                            didWriteDataHook:_didWriteDataHook];
196}
197
198@end
199
200@implementation HookInterceptor {
201  void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
202                     GRPCInterceptorManager *manager);
203  void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
204  void (^_finishHook)(GRPCInterceptorManager *manager);
205  void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
206  void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
207  void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
208  void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
209                             GRPCInterceptorManager *manager);
210  void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
211  GRPCInterceptorManager *_manager;
212  dispatch_queue_t _dispatchQueue;
213}
214
215- (dispatch_queue_t)dispatchQueue {
216  return _dispatchQueue;
217}
218
219- (instancetype)
220    initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
221                 dispatchQueue:(dispatch_queue_t)dispatchQueue
222                     startHook:(void (^)(GRPCRequestOptions *requestOptions,
223                                         GRPCCallOptions *callOptions,
224                                         GRPCInterceptorManager *manager))startHook
225                 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
226                    finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
227       receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
228                                         GRPCInterceptorManager *manager))receiveNextMessagesHook
229            responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
230                                         GRPCInterceptorManager *manager))responseHeaderHook
231              responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
232             responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
233                                         GRPCInterceptorManager *manager))responseCloseHook
234              didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
235  if ((self = [super initWithInterceptorManager:interceptorManager dispatchQueue:dispatchQueue])) {
236    _startHook = startHook;
237    _writeDataHook = writeDataHook;
238    _finishHook = finishHook;
239    _receiveNextMessagesHook = receiveNextMessagesHook;
240    _responseHeaderHook = responseHeaderHook;
241    _responseDataHook = responseDataHook;
242    _responseCloseHook = responseCloseHook;
243    _didWriteDataHook = didWriteDataHook;
244    _dispatchQueue = dispatchQueue;
245    _manager = interceptorManager;
246  }
247  return self;
248}
249
250- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
251                    callOptions:(GRPCCallOptions *)callOptions {
252  if (_startHook) {
253    _startHook(requestOptions, callOptions, _manager);
254  }
255}
256
257- (void)writeData:(id)data {
258  if (_writeDataHook) {
259    _writeDataHook(data, _manager);
260  }
261}
262
263- (void)finish {
264  if (_finishHook) {
265    _finishHook(_manager);
266  }
267}
268
269- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
270  if (_receiveNextMessagesHook) {
271    _receiveNextMessagesHook(numberOfMessages, _manager);
272  }
273}
274
275- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
276  if (_responseHeaderHook) {
277    _responseHeaderHook(initialMetadata, _manager);
278  }
279}
280
281- (void)didReceiveData:(id)data {
282  if (_responseDataHook) {
283    _responseDataHook(data, _manager);
284  }
285}
286
287- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
288  if (_responseCloseHook) {
289    _responseCloseHook(trailingMetadata, error, _manager);
290  }
291}
292
293- (void)didWriteData {
294  if (_didWriteDataHook) {
295    _didWriteDataHook(_manager);
296  }
297}
298
299@end
300
301@interface GlobalInterceptorFactory : HookInterceptorFactory
302
303@property BOOL enabled;
304
305- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue;
306
307- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
308                               GRPCInterceptorManager *manager))startHook
309              writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
310                 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
311    receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
312                                      GRPCInterceptorManager *manager))receiveNextMessagesHook
313         responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
314                                      GRPCInterceptorManager *manager))responseHeaderHook
315           responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
316          responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
317                                      GRPCInterceptorManager *manager))responseCloseHook
318           didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
319
320@end
321
322@implementation GlobalInterceptorFactory
323
324- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue {
325  _enabled = NO;
326  return [super initWithDispatchQueue:dispatchQueue
327                            startHook:nil
328                        writeDataHook:nil
329                           finishHook:nil
330              receiveNextMessagesHook:nil
331                   responseHeaderHook:nil
332                     responseDataHook:nil
333                    responseCloseHook:nil
334                     didWriteDataHook:nil];
335}
336
337- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
338  if (_enabled) {
339    return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
340                                                 dispatchQueue:_dispatchQueue
341                                                     startHook:_startHook
342                                                 writeDataHook:_writeDataHook
343                                                    finishHook:_finishHook
344                                       receiveNextMessagesHook:_receiveNextMessagesHook
345                                            responseHeaderHook:_responseHeaderHook
346                                              responseDataHook:_responseDataHook
347                                             responseCloseHook:_responseCloseHook
348                                              didWriteDataHook:_didWriteDataHook];
349  } else {
350    return nil;
351  }
352}
353
354- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
355                               GRPCInterceptorManager *manager))startHook
356              writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
357                 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
358    receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
359                                      GRPCInterceptorManager *manager))receiveNextMessagesHook
360         responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
361                                      GRPCInterceptorManager *manager))responseHeaderHook
362           responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
363          responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
364                                      GRPCInterceptorManager *manager))responseCloseHook
365           didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
366  _startHook = startHook;
367  _writeDataHook = writeDataHook;
368  _finishHook = finishHook;
369  _receiveNextMessagesHook = receiveNextMessagesHook;
370  _responseHeaderHook = responseHeaderHook;
371  _responseDataHook = responseDataHook;
372  _responseCloseHook = responseCloseHook;
373  _didWriteDataHook = didWriteDataHook;
374}
375
376@end
377
378static GlobalInterceptorFactory *globalInterceptorFactory = nil;
379static dispatch_once_t initGlobalInterceptorFactory;
380
381#pragma mark Tests
382
383@implementation InteropTests
384
385#pragma clang diagnostic push
386#pragma clang diagnostic ignored "-Warc-performSelector-leaks"
387- (void)retriableTest:(SEL)selector retries:(int)retries timeout:(NSTimeInterval)timeout {
388  for (int i = 0; i < retries; i++) {
389    NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:timeout];
390    NSCondition *cv = [[NSCondition alloc] init];
391    __block BOOL done = NO;
392    [cv lock];
393    dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0), ^{
394      [self performSelector:selector];
395      [cv lock];
396      done = YES;
397      [cv signal];
398      [cv unlock];
399    });
400    while (!done && [waitUntil timeIntervalSinceNow] > 0) {
401      [cv waitUntilDate:waitUntil];
402    }
403    if (done) {
404      [cv unlock];
405      break;
406    } else {
407      [cv unlock];
408      [self tearDown];
409      [self setUp];
410    }
411  }
412}
413#pragma clang diagnostic pop
414
415+ (XCTestSuite *)defaultTestSuite {
416  if (self == [InteropTests class]) {
417    return [XCTestSuite testSuiteWithName:@"InteropTestsEmptySuite"];
418  }
419  return super.defaultTestSuite;
420}
421
422+ (NSString *)host {
423  return nil;
424}
425
426// This number indicates how many bytes of overhead does Protocol Buffers encoding add onto the
427// message. The number varies as different message.proto is used on different servers. The actual
428// number for each interop server is overridden in corresponding derived test classes.
429- (int32_t)encodingOverhead {
430  return 0;
431}
432
433// For backwards compatibility
434+ (GRPCTransportType)transportType {
435  return GRPCTransportTypeChttp2BoringSSL;
436}
437
438+ (GRPCTransportID)transport {
439  return NULL;
440}
441
442+ (NSString *)PEMRootCertificates {
443  return nil;
444}
445
446+ (NSString *)hostNameOverride {
447  return nil;
448}
449
450+ (void)setUp {
451  dispatch_once(&initGlobalInterceptorFactory, ^{
452    dispatch_queue_t globalInterceptorQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
453    globalInterceptorFactory =
454        [[GlobalInterceptorFactory alloc] initWithDispatchQueue:globalInterceptorQueue];
455    [GRPCCall2 registerGlobalInterceptor:globalInterceptorFactory];
456  });
457}
458
459- (void)setUp {
460  self.continueAfterFailure = YES;
461  [GRPCCall resetHostSettings];
462  GRPCResetCallConnections();
463  XCTAssertNotNil([[self class] host]);
464}
465
466- (void)testEmptyUnaryRPC {
467  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiter, GRPCTestAssert assertBlock) {
468    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
469    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
470
471    GPBEmpty *request = [GPBEmpty message];
472
473    __weak RMTTestService *weakService = service;
474    [service emptyCallWithRequest:request
475                          handler:^(GPBEmpty *response, NSError *error) {
476                            if (weakService == nil) {
477                              return;
478                            }
479
480                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
481
482                            id expectedResponse = [GPBEmpty message];
483                            XCTAssertEqualObjects(response, expectedResponse);
484
485                            [expectation fulfill];
486                          }];
487    waiter(@[ expectation ], GRPCInteropTestTimeoutDefault);
488  });
489}
490
491- (void)testEmptyUnaryRPCWithV2API {
492  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
493    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
494    __weak XCTestExpectation *expectReceive =
495        [self expectationWithDescription:@"EmptyUnaryWithV2API received message"];
496    __weak XCTestExpectation *expectComplete =
497        [self expectationWithDescription:@"EmptyUnaryWithV2API completed"];
498
499    GPBEmpty *request = [GPBEmpty message];
500    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
501    // For backwards compatibility
502    options.transportType = [[self class] transportType];
503    options.transport = [[self class] transport];
504    options.PEMRootCertificates = [[self class] PEMRootCertificates];
505    options.hostNameOverride = [[self class] hostNameOverride];
506
507    __weak RMTTestService *weakService = service;
508    GRPCUnaryProtoCall *call = [service
509        emptyCallWithMessage:request
510             responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
511                                 messageCallback:^(id message) {
512                                   if (weakService == nil) {
513                                     return;
514                                   }
515                                   if (message) {
516                                     id expectedResponse = [GPBEmpty message];
517                                     XCTAssertEqualObjects(message, expectedResponse);
518                                     [expectReceive fulfill];
519                                   }
520                                 }
521                                 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
522                                   if (weakService == nil) {
523                                     return;
524                                   }
525                                   XCTAssertNil(error, @"Unexpected error: %@", error);
526                                   [expectComplete fulfill];
527                                 }]
528                 callOptions:options];
529    [call start];
530    waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault);
531  });
532}
533
534// Test that responses can be dispatched even if we do not run main run-loop
535- (void)testAsyncDispatchWithV2API {
536  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
537    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
538
539    XCTestExpectation *receiveExpect = [self expectationWithDescription:@"receiveExpect"];
540    XCTestExpectation *closeExpect = [self expectationWithDescription:@"closeExpect"];
541
542    GPBEmpty *request = [GPBEmpty message];
543    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
544    // For backwards compatibility
545    options.transportType = [[self class] transportType];
546    options.transport = [[self class] transport];
547    options.PEMRootCertificates = [[self class] PEMRootCertificates];
548    options.hostNameOverride = [[self class] hostNameOverride];
549
550    __block BOOL messageReceived = NO;
551    __block BOOL done = NO;
552    __weak RMTTestService *weakService = service;
553    GRPCUnaryProtoCall *call = [service
554        emptyCallWithMessage:request
555             responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
556                                 messageCallback:^(id message) {
557                                   if (weakService == nil) {
558                                     return;
559                                   }
560                                   if (message) {
561                                     id expectedResponse = [GPBEmpty message];
562                                     XCTAssertEqualObjects(message, expectedResponse);
563                                     messageReceived = YES;
564                                   }
565                                   [receiveExpect fulfill];
566                                 }
567                                 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
568                                   if (weakService == nil) {
569                                     return;
570                                   }
571                                   XCTAssertNil(error, @"Unexpected error: %@", error);
572                                   done = YES;
573                                   [closeExpect fulfill];
574                                 }]
575                 callOptions:options];
576
577    [call start];
578
579    waiterBlock(@[ receiveExpect, closeExpect ], GRPCInteropTestTimeoutDefault);
580    XCTAssertTrue(messageReceived);
581    XCTAssertTrue(done);
582  });
583}
584
585- (void)testLargeUnaryRPC {
586  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
587    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
588    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
589
590    RMTSimpleRequest *request = [RMTSimpleRequest message];
591    request.responseType = RMTPayloadType_Compressable;
592    request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
593    request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
594
595    __weak RMTTestService *weakService = service;
596    [service unaryCallWithRequest:request
597                          handler:^(RMTSimpleResponse *response, NSError *error) {
598                            if (weakService == nil) {
599                              return;
600                            }
601
602                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
603
604                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
605                            expectedResponse.payload.type = RMTPayloadType_Compressable;
606                            expectedResponse.payload.body =
607                                [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
608                            XCTAssertEqualObjects(response, expectedResponse);
609
610                            [expectation fulfill];
611                          }];
612    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
613  });
614}
615
616- (void)testUnaryResponseHandler {
617  // The test does not work on a remote server since it does not echo a trailer
618  if ([[self class] isRemoteTest]) {
619    return;
620  }
621
622  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
623    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
624
625    XCTestExpectation *expectComplete = [self expectationWithDescription:@"call complete"];
626    XCTestExpectation *expectCompleteMainQueue =
627        [self expectationWithDescription:@"main queue call complete"];
628
629    RMTSimpleRequest *request = [RMTSimpleRequest message];
630    request.responseType = RMTPayloadType_Compressable;
631    request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
632    request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
633
634    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
635    // For backwards compatibility
636    options.transportType = [[self class] transportType];
637    options.transport = [[self class] transport];
638    options.PEMRootCertificates = [[self class] PEMRootCertificates];
639    options.hostNameOverride = [[self class] hostNameOverride];
640    const unsigned char raw_bytes[] = {1, 2, 3, 4};
641    NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)];
642    options.initialMetadata = @{
643      @"x-grpc-test-echo-trailing-bin" : trailer_data,
644      @"x-grpc-test-echo-initial" : @"test-header"
645    };
646
647    __weak RMTTestService *weakService = service;
648
649    __block GRPCUnaryResponseHandler *handler = [[GRPCUnaryResponseHandler alloc]
650        initWithResponseHandler:^(GPBMessage *response, NSError *error) {
651          if (weakService == nil) {
652            return;
653          }
654
655          XCTAssertNil(error, @"Unexpected error: %@", error);
656          RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
657          expectedResponse.payload.type = RMTPayloadType_Compressable;
658          expectedResponse.payload.body =
659              [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
660          XCTAssertEqualObjects(response, expectedResponse);
661          XCTAssertEqualObjects(handler.responseHeaders[@"x-grpc-test-echo-initial"],
662                                @"test-header");
663          XCTAssertEqualObjects(handler.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
664                                trailer_data);
665          [expectComplete fulfill];
666        }
667          responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
668    __block GRPCUnaryResponseHandler *handlerMainQueue = [[GRPCUnaryResponseHandler alloc]
669        initWithResponseHandler:^(GPBMessage *response, NSError *error) {
670          if (weakService == nil) {
671            return;
672          }
673          XCTAssertNil(error, @"Unexpected error: %@", error);
674          RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
675          expectedResponse.payload.type = RMTPayloadType_Compressable;
676          expectedResponse.payload.body =
677              [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
678          XCTAssertEqualObjects(response, expectedResponse);
679          XCTAssertEqualObjects(handlerMainQueue.responseHeaders[@"x-grpc-test-echo-initial"],
680                                @"test-header");
681          XCTAssertEqualObjects(handlerMainQueue.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
682                                trailer_data);
683          [expectCompleteMainQueue fulfill];
684        }
685          responseDispatchQueue:nil];
686
687    [[service unaryCallWithMessage:request responseHandler:handler callOptions:options] start];
688    [[service unaryCallWithMessage:request responseHandler:handlerMainQueue
689                       callOptions:options] start];
690
691    waiterBlock(@[ expectComplete, expectCompleteMainQueue ], GRPCInteropTestTimeoutDefault);
692  });
693}
694
695- (void)testLargeUnaryRPCWithV2API {
696  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
697    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
698    __weak XCTestExpectation *expectReceive =
699        [self expectationWithDescription:@"LargeUnaryWithV2API received message"];
700    __weak XCTestExpectation *expectComplete =
701        [self expectationWithDescription:@"LargeUnaryWithV2API received complete"];
702
703    RMTSimpleRequest *request = [RMTSimpleRequest message];
704    request.responseType = RMTPayloadType_Compressable;
705    request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
706    request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
707
708    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
709    // For backwards compatibility
710    options.transportType = [[self class] transportType];
711    options.transport = [[self class] transport];
712    options.PEMRootCertificates = [[self class] PEMRootCertificates];
713    options.hostNameOverride = [[self class] hostNameOverride];
714
715    __weak RMTTestService *weakService = service;
716    GRPCUnaryProtoCall *call = [service
717        unaryCallWithMessage:request
718             responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
719                                 messageCallback:^(id message) {
720                                   if (weakService == nil) {
721                                     return;
722                                   }
723                                   XCTAssertNotNil(message);
724                                   if (message) {
725                                     RMTSimpleResponse *expectedResponse =
726                                         [RMTSimpleResponse message];
727                                     expectedResponse.payload.type = RMTPayloadType_Compressable;
728                                     expectedResponse.payload.body =
729                                         [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
730                                     XCTAssertEqualObjects(message, expectedResponse);
731
732                                     [expectReceive fulfill];
733                                   }
734                                 }
735                                 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
736                                   if (weakService == nil) {
737                                     return;
738                                   }
739                                   XCTAssertNil(error, @"Unexpected error: %@", error);
740                                   [expectComplete fulfill];
741                                 }]
742                 callOptions:options];
743    [call start];
744    waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault);
745  });
746}
747
748- (void)testConcurrentRPCsWithErrorsWithV2API {
749  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
750    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
751    NSMutableArray *completeExpectations = [NSMutableArray array];
752    NSMutableArray *calls = [NSMutableArray array];
753    int num_rpcs = 10;
754    for (int i = 0; i < num_rpcs; ++i) {
755      [completeExpectations
756          addObject:[self expectationWithDescription:
757                              [NSString stringWithFormat:@"Received trailer for RPC %d", i]]];
758
759      RMTSimpleRequest *request = [RMTSimpleRequest message];
760      request.responseType = RMTPayloadType_Compressable;
761      request.responseSize = SMALL_PAYLOAD_SIZE;
762      request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
763      if (i % 3 == 0) {
764        request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
765      } else if (i % 7 == 0) {
766        request.responseStatus.code = GRPC_STATUS_CANCELLED;
767      }
768      GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
769      // For backwards compatibility
770      options.transportType = [[self class] transportType];
771      options.transport = [[self class] transport];
772      options.PEMRootCertificates = [[self class] PEMRootCertificates];
773      options.hostNameOverride = [[self class] hostNameOverride];
774
775      __weak RMTTestService *weakService = service;
776      GRPCUnaryProtoCall *call = [service
777          unaryCallWithMessage:request
778               responseHandler:[[InteropTestsBlockCallbacks alloc]
779                                   initWithInitialMetadataCallback:nil
780                                   messageCallback:^(id message) {
781                                     if (weakService == nil) {
782                                       return;
783                                     }
784                                     if (message) {
785                                       RMTSimpleResponse *expectedResponse =
786                                           [RMTSimpleResponse message];
787                                       expectedResponse.payload.type = RMTPayloadType_Compressable;
788                                       expectedResponse.payload.body =
789                                           [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
790                                       XCTAssertEqualObjects(message, expectedResponse);
791                                     }
792                                   }
793                                   closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
794                                     if (weakService == nil) {
795                                       return;
796                                     }
797                                     [completeExpectations[i] fulfill];
798                                   }]
799                   callOptions:options];
800      [calls addObject:call];
801    }
802
803    for (int i = 0; i < num_rpcs; ++i) {
804      GRPCUnaryProtoCall *call = calls[i];
805      [call start];
806    }
807
808    waiterBlock(completeExpectations, GRPCInteropTestTimeoutDefault);
809  });
810}
811
812- (void)concurrentRPCsWithErrors {
813  const int kNumRpcs = 10;
814  __block int completedCallCount = 0;
815  NSCondition *cv = [[NSCondition alloc] init];
816  NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:GRPCInteropTestTimeoutDefault];
817  [cv lock];
818  for (int i = 0; i < kNumRpcs; ++i) {
819    RMTSimpleRequest *request = [RMTSimpleRequest message];
820    request.responseType = RMTPayloadType_Compressable;
821    request.responseSize = SMALL_PAYLOAD_SIZE;
822    request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
823    if (i % 3 == 0) {
824      request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
825    } else if (i % 7 == 0) {
826      request.responseStatus.code = GRPC_STATUS_CANCELLED;
827    }
828
829    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
830    __weak RMTTestService *weakService = service;
831    GRPCProtoCall *call = [service
832        RPCToUnaryCallWithRequest:request
833                          handler:^(RMTSimpleResponse *response, NSError *error) {
834                            if (weakService == nil) {
835                              return;
836                            }
837                            if (error == nil) {
838                              RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
839                              expectedResponse.payload.type = RMTPayloadType_Compressable;
840                              expectedResponse.payload.body =
841                                  [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
842                              XCTAssertEqualObjects(response, expectedResponse);
843                            }
844                            // DEBUG
845                            [cv lock];
846                            if (++completedCallCount == kNumRpcs) {
847                              [cv signal];
848                            }
849                            [cv unlock];
850                          }];
851    [call setResponseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
852    [call start];
853  }
854  while (completedCallCount < kNumRpcs && [waitUntil timeIntervalSinceNow] > 0) {
855    [cv waitUntilDate:waitUntil];
856  }
857  [cv unlock];
858}
859
860- (void)testConcurrentRPCsWithErrors {
861  [self retriableTest:@selector(concurrentRPCsWithErrors) retries:kTestRetries timeout:10];
862}
863
864- (void)testPacketCoalescing {
865  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
866    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
867    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
868
869    RMTSimpleRequest *request = [RMTSimpleRequest message];
870    request.responseType = RMTPayloadType_Compressable;
871    request.responseSize = SMALL_PAYLOAD_SIZE;
872    request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
873
874    [GRPCCall enableOpBatchLog:YES];
875    __weak RMTTestService *weakService = service;
876    [service unaryCallWithRequest:request
877                          handler:^(RMTSimpleResponse *response, NSError *error) {
878                            if (weakService == nil) {
879                              return;
880                            }
881                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
882
883                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
884                            expectedResponse.payload.type = RMTPayloadType_Compressable;
885                            expectedResponse.payload.body = [NSMutableData dataWithLength:10];
886                            XCTAssertEqualObjects(response, expectedResponse);
887
888                            // The test is a success if there is a batch of exactly 3 ops
889                            // (SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT).
890                            // Without packet coalescing each batch of ops contains only one op.
891                            NSArray *opBatches = [GRPCCall obtainAndCleanOpBatchLog];
892                            const NSInteger kExpectedOpBatchSize = 3;
893                            for (NSObject *o in opBatches) {
894                              if ([o isKindOfClass:[NSArray class]]) {
895                                NSArray *batch = (NSArray *)o;
896                                if ([batch count] == kExpectedOpBatchSize) {
897                                  [expectation fulfill];
898                                  break;
899                                }
900                              }
901                            }
902                          }];
903
904    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
905    [GRPCCall enableOpBatchLog:NO];
906  });
907}
908
909- (void)test4MBResponsesAreAccepted {
910  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
911    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
912    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"];
913
914    RMTSimpleRequest *request = [RMTSimpleRequest message];
915    const int32_t kPayloadSize =
916        4 * 1024 * 1024 - self.encodingOverhead;  // 4MB - encoding overhead
917    request.responseSize = kPayloadSize;
918
919    __weak RMTTestService *weakService = service;
920    [service unaryCallWithRequest:request
921                          handler:^(RMTSimpleResponse *response, NSError *error) {
922                            if (weakService == nil) {
923                              return;
924                            }
925                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
926                            XCTAssertEqual(response.payload.body.length, kPayloadSize);
927                            [expectation fulfill];
928                          }];
929
930    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
931  });
932}
933
934- (void)testResponsesOverMaxSizeFailWithActionableMessage {
935  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
936    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
937    __weak XCTestExpectation *expectation =
938        [self expectationWithDescription:@"ResponseOverMaxSize"];
939
940    RMTSimpleRequest *request = [RMTSimpleRequest message];
941    const int32_t kPayloadSize = 4 * 1024 * 1024 - self.encodingOverhead + 1;  // 1B over max size
942    request.responseSize = kPayloadSize;
943
944    __weak RMTTestService *weakService = service;
945    [service unaryCallWithRequest:request
946                          handler:^(RMTSimpleResponse *response, NSError *error) {
947                            if (weakService == nil) {
948                              return;
949                            }
950                            // TODO(jcanizales): Catch the error and rethrow it with an
951                            // actionable message:
952                            // - Use +[GRPCCall setResponseSizeLimit:forHost:] to set a
953                            // higher limit.
954                            // - If you're developing the server, consider using response
955                            // streaming, or let clients filter
956                            //   responses by setting a google.protobuf.FieldMask in the
957                            //   request:
958                            //   https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto
959                            XCTAssertEqualObjects(
960                                error.localizedDescription,
961                                @"CLIENT: Received message larger than max (4194305 vs. 4194304)");
962                            [expectation fulfill];
963                          }];
964    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
965  });
966}
967
968- (void)testResponsesOver4MBAreAcceptedIfOptedIn {
969  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
970    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
971    __weak XCTestExpectation *expectation =
972        [self expectationWithDescription:@"HigherResponseSizeLimit"];
973    __block NSError *callError = nil;
974
975    RMTSimpleRequest *request = [RMTSimpleRequest message];
976    const size_t kPayloadSize = 5 * 1024 * 1024;  // 5MB
977    request.responseSize = kPayloadSize;
978
979    [GRPCCall setResponseSizeLimit:6 * 1024 * 1024 forHost:[[self class] host]];
980    __weak RMTTestService *weakService = service;
981    [service unaryCallWithRequest:request
982                          handler:^(RMTSimpleResponse *response, NSError *error) {
983                            if (weakService == nil) {
984                              return;
985                            }
986                            callError = error;
987                            [expectation fulfill];
988                          }];
989
990    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
991    XCTAssertNil(callError, @"Finished with unexpected error: %@", callError);
992  });
993}
994
995- (void)testClientStreamingRPC {
996  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
997    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
998    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ClientStreaming"];
999
1000    RMTStreamingInputCallRequest *request1 = [RMTStreamingInputCallRequest message];
1001    request1.payload.body = [NSMutableData dataWithLength:27182];
1002
1003    RMTStreamingInputCallRequest *request2 = [RMTStreamingInputCallRequest message];
1004    request2.payload.body = [NSMutableData dataWithLength:8];
1005
1006    RMTStreamingInputCallRequest *request3 = [RMTStreamingInputCallRequest message];
1007    request3.payload.body = [NSMutableData dataWithLength:1828];
1008
1009    RMTStreamingInputCallRequest *request4 = [RMTStreamingInputCallRequest message];
1010    request4.payload.body = [NSMutableData dataWithLength:45904];
1011
1012    GRXWriter *writer = [GRXWriter writerWithContainer:@[ request1, request2, request3, request4 ]];
1013
1014    __weak RMTTestService *weakService = service;
1015    [service
1016        streamingInputCallWithRequestsWriter:writer
1017                                     handler:^(RMTStreamingInputCallResponse *response,
1018                                               NSError *error) {
1019                                       if (weakService == nil) {
1020                                         return;
1021                                       }
1022                                       XCTAssertNil(error, @"Finished with unexpected error: %@",
1023                                                    error);
1024
1025                                       RMTStreamingInputCallResponse *expectedResponse =
1026                                           [RMTStreamingInputCallResponse message];
1027                                       expectedResponse.aggregatedPayloadSize = 74922;
1028                                       XCTAssertEqualObjects(response, expectedResponse);
1029
1030                                       [expectation fulfill];
1031                                     }];
1032
1033    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1034  });
1035}
1036
1037- (void)testServerStreamingRPC {
1038  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1039    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1040    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ServerStreaming"];
1041
1042    NSArray *expectedSizes = @[ @31415, @9, @2653, @58979 ];
1043
1044    RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message];
1045    for (NSNumber *size in expectedSizes) {
1046      RMTResponseParameters *parameters = [RMTResponseParameters message];
1047      parameters.size = [size intValue];
1048      [request.responseParametersArray addObject:parameters];
1049    }
1050
1051    __block int index = 0;
1052    __weak RMTTestService *weakService = service;
1053    [service
1054        streamingOutputCallWithRequest:request
1055                          eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1056                                         NSError *error) {
1057                            if (weakService == nil) {
1058                              return;
1059                            }
1060
1061                            assertBlock(
1062                                error == nil,
1063                                [NSString
1064                                    stringWithFormat:@"Finished with unexpected error: %@", error]);
1065                            assertBlock(done || response,
1066                                        @"Event handler called without an event.");
1067
1068                            if (response) {
1069                              assertBlock(index < 4, @"More than 4 responses received.");
1070
1071                              id expected = [RMTStreamingOutputCallResponse
1072                                  messageWithPayloadSize:expectedSizes[index]];
1073                              assertBlock(
1074                                  [response isEqual:expected],
1075                                  [NSString
1076                                      stringWithFormat:@"response %@ not equal to expected %@",
1077                                                       response, expected]);
1078
1079                              index += 1;
1080                            }
1081
1082                            if (done) {
1083                              assertBlock(
1084                                  index == 4,
1085                                  [NSString stringWithFormat:@"Received %@ responses instead of 4.",
1086                                                             @(index)]);
1087                              [expectation fulfill];
1088                            }
1089                          }];
1090
1091    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1092  });
1093}
1094
1095- (void)testPingPongRPC {
1096  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1097    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1098    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];
1099
1100    NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1101    NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1102
1103    GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
1104
1105    __block int index = 0;
1106
1107    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1108                                                 requestedResponseSize:responses[index]];
1109    [requestsBuffer writeValue:request];
1110
1111    __weak RMTTestService *weakService = service;
1112    [service
1113        fullDuplexCallWithRequestsWriter:requestsBuffer
1114                            eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1115                                           NSError *error) {
1116                              if (weakService == nil) {
1117                                return;
1118                              }
1119
1120                              assertBlock(
1121                                  error == nil,
1122                                  [NSString stringWithFormat:@"Finished with unexpected error: %@",
1123                                                             error]);
1124                              assertBlock(done || response,
1125                                          @"Event handler called without an event.");
1126
1127                              if (response) {
1128                                assertBlock(index < 4, @"More than 4 responses received.");
1129
1130                                id expected = [RMTStreamingOutputCallResponse
1131                                    messageWithPayloadSize:responses[index]];
1132                                assertBlock(
1133                                    [response isEqual:expected],
1134                                    [NSString
1135                                        stringWithFormat:@"response %@ not equal to expected %@",
1136                                                         response, expected]);
1137
1138                                index += 1;
1139                                if (index < 4) {
1140                                  id request = [RMTStreamingOutputCallRequest
1141                                      messageWithPayloadSize:requests[index]
1142                                       requestedResponseSize:responses[index]];
1143                                  [requestsBuffer writeValue:request];
1144                                } else {
1145                                  [requestsBuffer writesFinishedWithError:nil];
1146                                }
1147                              }
1148
1149                              if (done) {
1150                                assertBlock(
1151                                    index == 4,
1152                                    [NSString
1153                                        stringWithFormat:@"Received %@ responses instead of 4.",
1154                                                         @(index)]);
1155                                [expectation fulfill];
1156                              }
1157                            }];
1158    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1159  });
1160}
1161
1162- (void)testPingPongRPCWithV2API {
1163  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1164    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1165    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
1166
1167    NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1168    NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1169
1170    __block int index = 0;
1171
1172    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1173                                                 requestedResponseSize:responses[index]];
1174    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1175    // For backwards compatibility
1176    options.transportType = [[self class] transportType];
1177    options.transport = [[self class] transport];
1178    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1179    options.hostNameOverride = [[self class] hostNameOverride];
1180
1181    __weak __block GRPCStreamingProtoCall *weakCall;
1182    GRPCStreamingProtoCall *call = [service
1183        fullDuplexCallWithResponseHandler:
1184            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
1185                messageCallback:^(id message) {
1186                  GRPCStreamingProtoCall *localCall = weakCall;
1187                  if (localCall == nil) {
1188                    return;
1189                  }
1190                  assertBlock(index < 4, @"More than 4 responses received.");
1191
1192                  id expected =
1193                      [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
1194                  assertBlock([message isEqual:expected],
1195                              [NSString stringWithFormat:@"message %@ not equal to expected %@",
1196                                                         message, expected]);
1197                  index += 1;
1198                  if (index < 4) {
1199                    id request =
1200                        [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1201                                                        requestedResponseSize:responses[index]];
1202                    [localCall writeMessage:request];
1203                  } else {
1204                    [localCall finish];
1205                  }
1206                }
1207                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1208                  if (weakCall == nil) {
1209                    return;
1210                  }
1211                  assertBlock(
1212                      error == nil,
1213                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
1214                  assertBlock(
1215                      index == 4,
1216                      [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
1217                  [expectation fulfill];
1218                }]
1219                              callOptions:options];
1220    weakCall = call;
1221    [call start];
1222    [call writeMessage:request];
1223
1224    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1225  });
1226}
1227
1228- (void)testPingPongRPCWithFlowControl {
1229  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1230    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1231    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
1232
1233    NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1234    NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1235
1236    __block int index = 0;
1237
1238    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1239                                                 requestedResponseSize:responses[index]];
1240    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1241    // For backwards compatibility
1242    options.transportType = [[self class] transportType];
1243    options.transport = [[self class] transport];
1244    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1245    options.hostNameOverride = [[self class] hostNameOverride];
1246    options.flowControlEnabled = YES;
1247    __block int writeMessageCount = 0;
1248
1249    __weak __block GRPCStreamingProtoCall *weakCall;
1250    GRPCStreamingProtoCall *call = [service
1251        fullDuplexCallWithResponseHandler:
1252            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
1253                messageCallback:^(id message) {
1254                  GRPCStreamingProtoCall *localCall = weakCall;
1255                  if (localCall == nil) {
1256                    return;
1257                  }
1258
1259                  assertBlock((index < 4), @"More than 4 responses received.");
1260                  id expected =
1261                      [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
1262                  assertBlock(
1263                      [message isEqual:expected],
1264                      [NSString stringWithFormat:@"message %@ not equal to %@", message, expected]);
1265
1266                  index += 1;
1267                  if (index < 4) {
1268                    id request =
1269                        [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1270                                                        requestedResponseSize:responses[index]];
1271                    [localCall writeMessage:request];
1272                    [localCall receiveNextMessage];
1273                  } else {
1274                    [localCall finish];
1275                  }
1276                }
1277                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1278                  if (weakCall == nil) {
1279                    return;
1280                  }
1281
1282                  assertBlock(
1283                      error == nil,
1284                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
1285                  assertBlock(
1286                      index == 4,
1287                      [NSString stringWithFormat:@"Received %i responses instead of 4.", index]);
1288                  [expectation fulfill];
1289                }
1290                writeMessageCallback:^{
1291                  writeMessageCount++;
1292                }]
1293                              callOptions:options];
1294    weakCall = call;
1295    [call start];
1296    [call receiveNextMessage];
1297    [call writeMessage:request];
1298
1299    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1300    assertBlock(
1301        writeMessageCount == 4,
1302        [NSString stringWithFormat:@"writeMessageCount %@ not equal to 4", @(writeMessageCount)]);
1303  });
1304}
1305
1306- (void)testEmptyStreamRPC {
1307  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1308    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1309    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
1310    __weak RMTTestService *weakService = service;
1311    [service
1312        fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
1313                            eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1314                                           NSError *error) {
1315                              if (weakService == nil) {
1316                                return;
1317                              }
1318                              XCTAssertNil(error, @"Finished with unexpected error: %@", error);
1319                              XCTAssert(done, @"Unexpected response: %@", response);
1320                              [expectation fulfill];
1321                            }];
1322    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1323  });
1324}
1325
1326- (void)testCancelAfterBeginRPC {
1327  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1328    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1329    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
1330
1331    // A buffered pipe to which we never write any value acts as a writer that just hangs.
1332    GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
1333
1334    __weak RMTTestService *weakService = service;
1335    GRPCProtoCall *call = [service
1336        RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
1337                                          handler:^(RMTStreamingInputCallResponse *response,
1338                                                    NSError *error) {
1339                                            if (weakService == nil) {
1340                                              return;
1341                                            }
1342                                            XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1343                                            [expectation fulfill];
1344                                          }];
1345    XCTAssertEqual(call.state, GRXWriterStateNotStarted);
1346
1347    [call start];
1348    XCTAssertEqual(call.state, GRXWriterStateStarted);
1349
1350    [call cancel];
1351    XCTAssertEqual(call.state, GRXWriterStateFinished);
1352
1353    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1354  });
1355}
1356
1357- (void)testCancelAfterBeginRPCWithV2API {
1358  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1359    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1360    __weak XCTestExpectation *expectation =
1361        [self expectationWithDescription:@"CancelAfterBeginWithV2API"];
1362
1363    // A buffered pipe to which we never write any value acts as a writer that just hangs.
1364    __weak RMTTestService *weakService = service;
1365    GRPCStreamingProtoCall *call = [service
1366        streamingInputCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1367                                                  initWithInitialMetadataCallback:nil
1368                                                  messageCallback:^(id message) {
1369                                                    if (weakService == nil) {
1370                                                      return;
1371                                                    }
1372                                                    XCTFail(@"Not expected to receive message");
1373                                                  }
1374                                                  closeCallback:^(NSDictionary *trailingMetadata,
1375                                                                  NSError *error) {
1376                                                    if (weakService == nil) {
1377                                                      return;
1378                                                    }
1379                                                    XCTAssertEqual(error.code,
1380                                                                   GRPC_STATUS_CANCELLED);
1381                                                    [expectation fulfill];
1382                                                  }]
1383                                  callOptions:nil];
1384    [call start];
1385    [call cancel];
1386
1387    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1388  });
1389}
1390
1391- (void)testCancelAfterFirstResponseRPC {
1392  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1393    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1394    __weak XCTestExpectation *expectation =
1395        [self expectationWithDescription:@"CancelAfterFirstResponse"];
1396
1397    // A buffered pipe to which we write a single value but never close
1398    GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
1399
1400    __block BOOL receivedResponse = NO;
1401
1402    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1403                                                 requestedResponseSize:@31415];
1404
1405    [requestsBuffer writeValue:request];
1406
1407    __weak RMTTestService *weakService = service;
1408    __block GRPCProtoCall *call = [service
1409        RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
1410                                 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1411                                                NSError *error) {
1412                                   if (weakService == nil) {
1413                                     return;
1414                                   }
1415                                   if (receivedResponse) {
1416                                     XCTAssert(done, @"Unexpected extra response %@", response);
1417                                     XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1418                                     [expectation fulfill];
1419                                   } else {
1420                                     XCTAssertNil(error, @"Finished with unexpected error: %@",
1421                                                  error);
1422                                     XCTAssertFalse(done, @"Finished without response");
1423                                     XCTAssertNotNil(response);
1424                                     receivedResponse = YES;
1425                                     [call cancel];
1426                                   }
1427                                 }];
1428    [call start];
1429    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1430  });
1431}
1432
1433- (void)testCancelAfterFirstResponseRPCWithV2API {
1434  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1435    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1436    __weak XCTestExpectation *completionExpectation =
1437        [self expectationWithDescription:@"Call completed."];
1438    __weak XCTestExpectation *responseExpectation =
1439        [self expectationWithDescription:@"Received response."];
1440
1441    __block BOOL receivedResponse = NO;
1442
1443    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1444    // For backwards compatibility
1445    options.transportType = self.class.transportType;
1446    options.transport = [[self class] transport];
1447    options.PEMRootCertificates = self.class.PEMRootCertificates;
1448    options.hostNameOverride = [[self class] hostNameOverride];
1449
1450    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1451                                                 requestedResponseSize:@31415];
1452
1453    __weak __block GRPCStreamingProtoCall *weakCall;
1454    GRPCStreamingProtoCall *call = [service
1455        fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1456                                              initWithInitialMetadataCallback:nil
1457                                              messageCallback:^(id message) {
1458                                                GRPCStreamingProtoCall *localCall = weakCall;
1459                                                if (localCall == nil) {
1460                                                  return;
1461                                                }
1462                                                XCTAssertFalse(receivedResponse);
1463                                                receivedResponse = YES;
1464                                                [localCall cancel];
1465                                                [responseExpectation fulfill];
1466                                              }
1467                                              closeCallback:^(NSDictionary *trailingMetadata,
1468                                                              NSError *error) {
1469                                                if (weakCall == nil) {
1470                                                  return;
1471                                                }
1472                                                XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1473                                                [completionExpectation fulfill];
1474                                              }]
1475                              callOptions:options];
1476    weakCall = call;
1477    [call start];
1478    [call writeMessage:request];
1479    waiterBlock(@[ completionExpectation, responseExpectation ], GRPCInteropTestTimeoutDefault);
1480  });
1481}
1482
1483- (void)testCancelAfterFirstRequestWithV2API {
1484  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1485    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1486    __weak XCTestExpectation *completionExpectation =
1487        [self expectationWithDescription:@"Call completed."];
1488
1489    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1490    // For backwards compatibility
1491    options.transportType = self.class.transportType;
1492    options.transport = [[self class] transport];
1493    options.PEMRootCertificates = self.class.PEMRootCertificates;
1494    options.hostNameOverride = [[self class] hostNameOverride];
1495
1496    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1497                                                 requestedResponseSize:@31415];
1498
1499    __weak RMTTestService *weakService = service;
1500    GRPCStreamingProtoCall *call = [service
1501        fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1502                                              initWithInitialMetadataCallback:nil
1503                                              messageCallback:^(id message) {
1504                                                if (weakService == nil) {
1505                                                  return;
1506                                                }
1507                                                XCTFail(@"Received unexpected response.");
1508                                              }
1509                                              closeCallback:^(NSDictionary *trailingMetadata,
1510                                                              NSError *error) {
1511                                                if (weakService == nil) {
1512                                                  return;
1513                                                }
1514                                                XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1515                                                [completionExpectation fulfill];
1516                                              }]
1517                              callOptions:options];
1518    [call start];
1519    [call writeMessage:request];
1520    [call cancel];
1521    waiterBlock(@[ completionExpectation ], GRPCInteropTestTimeoutDefault);
1522  });
1523}
1524
1525- (void)testRPCAfterClosingOpenConnections {
1526  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1527    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1528    __weak XCTestExpectation *expectation =
1529        [self expectationWithDescription:@"RPC after closing connection"];
1530
1531    GPBEmpty *request = [GPBEmpty message];
1532
1533    __weak RMTTestService *weakService = service;
1534    [service
1535        emptyCallWithRequest:request
1536                     handler:^(GPBEmpty *response, NSError *error) {
1537                       if (weakService == nil) {
1538                         return;
1539                       }
1540                       XCTAssertNil(error, @"First RPC finished with unexpected error: %@", error);
1541
1542#pragma clang diagnostic push
1543#pragma clang diagnostic ignored "-Wdeprecated-declarations"
1544                       [GRPCCall closeOpenConnections];
1545#pragma clang diagnostic pop
1546
1547                       [weakService
1548                           emptyCallWithRequest:request
1549                                        handler:^(GPBEmpty *response, NSError *error) {
1550                                          XCTAssertNil(
1551                                              error,
1552                                              @"Second RPC finished with unexpected error: %@",
1553                                              error);
1554                                          [expectation fulfill];
1555                                        }];
1556                     }];
1557
1558    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1559  });
1560}
1561
1562- (void)testCompressedUnaryRPC {
1563  // This test needs to be disabled for remote test because interop server grpc-test
1564  // does not support compression.
1565  if (isRemoteInteropTest([[self class] host])) {
1566    return;
1567  }
1568
1569  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1570    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1571    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
1572
1573    RMTSimpleRequest *request = [RMTSimpleRequest message];
1574    request.responseType = RMTPayloadType_Compressable;
1575    request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
1576    request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
1577    request.expectCompressed.value = YES;
1578    [GRPCCall setDefaultCompressMethod:GRPCCompressGzip forhost:[[self class] host]];
1579
1580    __weak RMTTestService *weakService = service;
1581    [service unaryCallWithRequest:request
1582                          handler:^(RMTSimpleResponse *response, NSError *error) {
1583                            if (weakService == nil) {
1584                              return;
1585                            }
1586
1587                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
1588
1589                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
1590                            expectedResponse.payload.type = RMTPayloadType_Compressable;
1591                            expectedResponse.payload.body =
1592                                [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
1593                            XCTAssertEqualObjects(response, expectedResponse);
1594
1595                            [expectation fulfill];
1596                          }];
1597
1598    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1599  });
1600}
1601
1602// TODO(b/268379869): This test has a race and is flaky in any configurations. One possible way to
1603// deflake this test is to find a way to disable ping ack on the interop server for this test case.
1604- (void)testKeepaliveWithV2API {
1605  return;
1606
1607  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1608    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1609    if ([[self class] transport] == gGRPCCoreCronetID) {
1610      // Cronet does not support keepalive
1611      return;
1612    }
1613    __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Keepalive"];
1614
1615    const NSTimeInterval kTestTimeout = 5;
1616    NSNumber *kRequestSize = @27182;
1617    NSNumber *kResponseSize = @31415;
1618
1619    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:kRequestSize
1620                                                 requestedResponseSize:kResponseSize];
1621    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1622    options.transportType = [[self class] transportType];
1623    options.transport = [[self class] transport];
1624    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1625    options.hostNameOverride = [[self class] hostNameOverride];
1626    options.keepaliveInterval = 1.5;
1627    options.keepaliveTimeout = 0;
1628
1629    __weak RMTTestService *weakService = service;
1630    GRPCStreamingProtoCall *call = [service
1631        fullDuplexCallWithResponseHandler:
1632            [[InteropTestsBlockCallbacks alloc]
1633                initWithInitialMetadataCallback:nil
1634                                messageCallback:nil
1635                                  closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1636                                    if (weakService == nil) {
1637                                      return;
1638                                    }
1639                                    XCTAssertNotNil(error);
1640                                    XCTAssertEqual(
1641                                        error.code, GRPC_STATUS_UNAVAILABLE,
1642                                        @"Received status %@ instead of UNAVAILABLE (14).",
1643                                        @(error.code));
1644                                    [expectation fulfill];
1645                                  }]
1646                              callOptions:options];
1647    [call writeMessage:request];
1648    [call start];
1649
1650    waiterBlock(@[ expectation ], kTestTimeout);
1651    [call finish];
1652  });
1653}
1654
1655- (void)testDefaultInterceptor {
1656  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1657    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1658    __weak XCTestExpectation *expectation =
1659        [self expectationWithDescription:@"testDefaultInterceptor"];
1660
1661    NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1662    NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1663
1664    __block int index = 0;
1665
1666    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1667                                                 requestedResponseSize:responses[index]];
1668    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1669    // For backwards compatibility
1670    options.transportType = [[self class] transportType];
1671    options.transport = [[self class] transport];
1672    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1673    options.hostNameOverride = [[self class] hostNameOverride];
1674    options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init] ];
1675
1676    __weak __block GRPCStreamingProtoCall *weakCall;
1677    GRPCStreamingProtoCall *call = [service
1678        fullDuplexCallWithResponseHandler:
1679            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
1680                messageCallback:^(id message) {
1681                  GRPCStreamingProtoCall *localCall = weakCall;
1682                  if (localCall == nil) {
1683                    return;
1684                  }
1685                  assertBlock(index < 4, @"More than 4 responses received.");
1686
1687                  id expected =
1688                      [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
1689                  assertBlock([message isEqual:expected],
1690                              [NSString stringWithFormat:@"message %@ not equal to expected %@",
1691                                                         message, expected]);
1692
1693                  index += 1;
1694                  if (index < 4) {
1695                    id request =
1696                        [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1697                                                        requestedResponseSize:responses[index]];
1698                    [localCall writeMessage:request];
1699                  } else {
1700                    [localCall finish];
1701                  }
1702                }
1703                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1704                  if (weakCall == nil) {
1705                    return;
1706                  }
1707
1708                  assertBlock(
1709                      index == 4,
1710                      [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
1711
1712                  assertBlock(
1713                      error == nil,
1714                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
1715
1716                  [expectation fulfill];
1717                }]
1718                              callOptions:options];
1719    weakCall = call;
1720    [call start];
1721    [call writeMessage:request];
1722
1723    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1724  });
1725}
1726
1727- (void)testLoggingInterceptor {
1728  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1729    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1730    __weak XCTestExpectation *expectation =
1731        [self expectationWithDescription:@"testLoggingInterceptor"];
1732
1733    __block NSUInteger startCount = 0;
1734    __block NSUInteger writeDataCount = 0;
1735    __block NSUInteger finishCount = 0;
1736    __block NSUInteger receiveNextMessageCount = 0;
1737    __block NSUInteger responseHeaderCount = 0;
1738    __block NSUInteger responseDataCount = 0;
1739    __block NSUInteger responseCloseCount = 0;
1740    __block NSUInteger didWriteDataCount = 0;
1741    id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1742        initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1743        startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1744                    GRPCInterceptorManager *manager) {
1745          startCount++;
1746          XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
1747          XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
1748          XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
1749          [manager startNextInterceptorWithRequest:[requestOptions copy]
1750                                       callOptions:[callOptions copy]];
1751        }
1752        writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1753          writeDataCount++;
1754          [manager writeNextInterceptorWithData:data];
1755        }
1756        finishHook:^(GRPCInterceptorManager *manager) {
1757          finishCount++;
1758          [manager finishNextInterceptor];
1759        }
1760        receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
1761          receiveNextMessageCount++;
1762          [manager receiveNextInterceptorMessages:numberOfMessages];
1763        }
1764        responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1765          responseHeaderCount++;
1766          [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1767        }
1768        responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1769          responseDataCount++;
1770          [manager forwardPreviousInterceptorWithData:data];
1771        }
1772        responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1773                            GRPCInterceptorManager *manager) {
1774          responseCloseCount++;
1775          [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
1776                                                                 error:error];
1777        }
1778        didWriteDataHook:^(GRPCInterceptorManager *manager) {
1779          didWriteDataCount++;
1780          [manager forwardPreviousInterceptorDidWriteData];
1781        }];
1782
1783    NSArray *requests = @[ @1, @2, @3, @4 ];
1784    NSArray *responses = @[ @1, @2, @3, @4 ];
1785
1786    __block int messageIndex = 0;
1787
1788    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[messageIndex]
1789                                                 requestedResponseSize:responses[messageIndex]];
1790    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1791    // For backwards compatibility
1792    options.transportType = [[self class] transportType];
1793    options.transport = [[self class] transport];
1794    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1795    options.hostNameOverride = [[self class] hostNameOverride];
1796    options.flowControlEnabled = YES;
1797    options.interceptorFactories = @[ factory ];
1798
1799    __block int writeMessageCount = 0;
1800    __block __weak GRPCStreamingProtoCall *weakCall;
1801    GRPCStreamingProtoCall *call = [service
1802        fullDuplexCallWithResponseHandler:
1803            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
1804                messageCallback:^(id message) {
1805                  GRPCStreamingProtoCall *localCall = weakCall;
1806                  if (localCall == nil) {
1807                    return;
1808                  }
1809                  assertBlock((messageIndex < 4), @"More than 4 responses received.");
1810
1811                  id expected = [RMTStreamingOutputCallResponse
1812                      messageWithPayloadSize:responses[messageIndex]];
1813                  assertBlock([message isEqual:expected],
1814                              [NSString stringWithFormat:@"message %@ not equal to expected %@",
1815                                                         message, expected]);
1816                  messageIndex += 1;
1817                  if (messageIndex < 4) {
1818                    id request = [RMTStreamingOutputCallRequest
1819                        messageWithPayloadSize:requests[messageIndex]
1820                         requestedResponseSize:responses[messageIndex]];
1821                    [localCall writeMessage:request];
1822                    [localCall receiveNextMessage];
1823                  } else {
1824                    [localCall finish];
1825                  }
1826                }
1827                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1828                  if (weakCall == nil) {
1829                    return;
1830                  }
1831                  assertBlock(
1832                      error == nil,
1833                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
1834                  assertBlock(messageIndex == 4,
1835                              [NSString stringWithFormat:@"Received %@ responses instead of 4.",
1836                                                         @(messageIndex)]);
1837                  [expectation fulfill];
1838                }
1839                writeMessageCallback:^{
1840                  writeMessageCount++;
1841                }]
1842                              callOptions:options];
1843
1844    weakCall = call;
1845    [call start];
1846    [call receiveNextMessage];
1847    [call writeMessage:request];
1848
1849    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
1850
1851    assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
1852    assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
1853    assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
1854    assertBlock(receiveNextMessageCount == 4,
1855                [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
1856    assertBlock(responseHeaderCount == 1,
1857                [NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
1858    assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
1859    assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
1860    assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
1861    assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
1862  });
1863}
1864
1865// Chain a default interceptor and a hook interceptor which, after one write, cancels the call
1866// under the hood but forward further data to the user.
1867- (void)testHijackingInterceptor {
1868  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
1869    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
1870    NSUInteger kCancelAfterWrites = 1;
1871    __weak XCTestExpectation *expectUserCallComplete =
1872        [self expectationWithDescription:@"User call completed."];
1873    __weak XCTestExpectation *expectResponseCallbackComplete =
1874        [self expectationWithDescription:@"Hook interceptor response callback completed"];
1875
1876    NSArray *responses = @[ @1, @2, @3, @4 ];
1877    __block int index = 0;
1878
1879    __block NSUInteger startCount = 0;
1880    __block NSUInteger writeDataCount = 0;
1881    __block NSUInteger finishCount = 0;
1882    __block NSUInteger responseHeaderCount = 0;
1883    __block NSUInteger responseDataCount = 0;
1884    __block NSUInteger responseCloseCount = 0;
1885    id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1886        initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1887        startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1888                    GRPCInterceptorManager *manager) {
1889          startCount++;
1890          [manager startNextInterceptorWithRequest:[requestOptions copy]
1891                                       callOptions:[callOptions copy]];
1892        }
1893        writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1894          writeDataCount++;
1895          if (index < kCancelAfterWrites) {
1896            [manager writeNextInterceptorWithData:data];
1897          } else if (index == kCancelAfterWrites) {
1898            [manager cancelNextInterceptor];
1899            [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
1900                                                            messageWithPayloadSize:responses[index]]
1901                                                            data]];
1902          } else {  // (index > kCancelAfterWrites)
1903            [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
1904                                                            messageWithPayloadSize:responses[index]]
1905                                                            data]];
1906          }
1907        }
1908        finishHook:^(GRPCInterceptorManager *manager) {
1909          finishCount++;
1910          // finish must happen after the hijacking, so directly reply with a close
1911          [manager forwardPreviousInterceptorCloseWithTrailingMetadata:@{@"grpc-status" : @"0"}
1912                                                                 error:nil];
1913          [manager shutDown];
1914        }
1915        receiveNextMessagesHook:nil
1916        responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1917          responseHeaderCount++;
1918          [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1919        }
1920        responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1921          responseDataCount++;
1922          [manager forwardPreviousInterceptorWithData:data];
1923        }
1924        responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1925                            GRPCInterceptorManager *manager) {
1926          responseCloseCount++;
1927          // since we canceled the call, it should return cancel error
1928          XCTAssertNil(trailingMetadata);
1929          XCTAssertNotNil(error);
1930          XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1931          [expectResponseCallbackComplete fulfill];
1932        }
1933        didWriteDataHook:nil];
1934
1935    NSArray *requests = @[ @1, @2, @3, @4 ];
1936
1937    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1938                                                 requestedResponseSize:responses[index]];
1939    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1940    // For backwards compatibility
1941    options.transportType = [[self class] transportType];
1942    options.transport = [[self class] transport];
1943    options.PEMRootCertificates = [[self class] PEMRootCertificates];
1944    options.hostNameOverride = [[self class] hostNameOverride];
1945    options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init], factory ];
1946
1947    __weak __block GRPCStreamingProtoCall *weakCall;
1948    GRPCStreamingProtoCall *call = [service
1949        fullDuplexCallWithResponseHandler:
1950            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
1951                messageCallback:^(id message) {
1952                  GRPCStreamingProtoCall *localCall = weakCall;
1953                  if (localCall == nil) {
1954                    return;
1955                  }
1956
1957                  assertBlock(index < 4, @"More than 4 responses received.");
1958
1959                  id expected =
1960                      [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
1961                  assertBlock([message isEqual:expected],
1962                              [NSString stringWithFormat:@"message %@ not equal to expected %@",
1963                                                         message, expected]);
1964                  index += 1;
1965                  if (index < 4) {
1966                    id request =
1967                        [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1968                                                        requestedResponseSize:responses[index]];
1969                    [localCall writeMessage:request];
1970                    [localCall receiveNextMessage];
1971                  } else {
1972                    [self waitForExpectations:@[ expectResponseCallbackComplete ]
1973                                      timeout:GRPCInteropTestTimeoutDefault];
1974                    [localCall finish];
1975                  }
1976                }
1977                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1978                  if (weakCall == nil) {
1979                    return;
1980                  }
1981                  assertBlock(
1982                      error == nil,
1983                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
1984                  assertBlock(
1985                      index == 4,
1986                      [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
1987
1988                  [expectUserCallComplete fulfill];
1989                }]
1990                              callOptions:options];
1991    weakCall = call;
1992    [call start];
1993    [call receiveNextMessage];
1994    [call writeMessage:request];
1995
1996    waiterBlock(@[ expectUserCallComplete ], GRPCInteropTestTimeoutDefault);
1997    assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
1998    assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
1999    assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
2000    assertBlock(responseHeaderCount == 1,
2001                [NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
2002    assertBlock(responseDataCount == 1, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
2003    assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
2004  });
2005}
2006
2007- (void)testGlobalInterceptor {
2008  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
2009    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
2010    __weak XCTestExpectation *expectation =
2011        [self expectationWithDescription:@"testGlobalInterceptor"];
2012
2013    __block NSUInteger startCount = 0;
2014    __block NSUInteger writeDataCount = 0;
2015    __block NSUInteger finishCount = 0;
2016    __block NSUInteger receiveNextMessageCount = 0;
2017    __block NSUInteger responseHeaderCount = 0;
2018    __block NSUInteger responseDataCount = 0;
2019    __block NSUInteger responseCloseCount = 0;
2020    __block NSUInteger didWriteDataCount = 0;
2021    [globalInterceptorFactory
2022        setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
2023                       GRPCInterceptorManager *manager) {
2024          startCount++;
2025          XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
2026          XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
2027          XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
2028          [manager startNextInterceptorWithRequest:[requestOptions copy]
2029                                       callOptions:[callOptions copy]];
2030        }
2031        writeDataHook:^(id data, GRPCInterceptorManager *manager) {
2032          writeDataCount++;
2033          [manager writeNextInterceptorWithData:data];
2034        }
2035        finishHook:^(GRPCInterceptorManager *manager) {
2036          finishCount++;
2037          [manager finishNextInterceptor];
2038        }
2039        receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
2040          receiveNextMessageCount++;
2041          [manager receiveNextInterceptorMessages:numberOfMessages];
2042        }
2043        responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
2044          responseHeaderCount++;
2045          [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
2046        }
2047        responseDataHook:^(id data, GRPCInterceptorManager *manager) {
2048          responseDataCount++;
2049          [manager forwardPreviousInterceptorWithData:data];
2050        }
2051        responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
2052                            GRPCInterceptorManager *manager) {
2053          responseCloseCount++;
2054          [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
2055                                                                 error:error];
2056        }
2057        didWriteDataHook:^(GRPCInterceptorManager *manager) {
2058          didWriteDataCount++;
2059          [manager forwardPreviousInterceptorDidWriteData];
2060        }];
2061
2062    NSArray *requests = @[ @1, @2, @3, @4 ];
2063    NSArray *responses = @[ @1, @2, @3, @4 ];
2064
2065    __block int index = 0;
2066
2067    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
2068                                                 requestedResponseSize:responses[index]];
2069    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
2070    // For backwards compatibility
2071    options.transportType = [[self class] transportType];
2072    options.transport = [[self class] transport];
2073    options.PEMRootCertificates = [[self class] PEMRootCertificates];
2074    options.hostNameOverride = [[self class] hostNameOverride];
2075    options.flowControlEnabled = YES;
2076    globalInterceptorFactory.enabled = YES;
2077
2078    __block int writeMessageCount = 0;
2079    __weak __block GRPCStreamingProtoCall *weakCall;
2080    __block GRPCStreamingProtoCall *call = [service
2081        fullDuplexCallWithResponseHandler:
2082            [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
2083                messageCallback:^(id message) {
2084                  GRPCStreamingProtoCall *localCall = weakCall;
2085                  if (localCall == nil) {
2086                    return;
2087                  }
2088                  assertBlock(index < 4, @"More than 4 responses received.");
2089
2090                  index += 1;
2091                  if (index < 4) {
2092                    id request =
2093                        [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
2094                                                        requestedResponseSize:responses[index]];
2095                    [localCall writeMessage:request];
2096                    [localCall receiveNextMessage];
2097                  } else {
2098                    [localCall finish];
2099                  }
2100                }
2101                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
2102                  if (weakCall == nil) {
2103                    return;
2104                  }
2105                  assertBlock(
2106                      error == nil,
2107                      [NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
2108                  [expectation fulfill];
2109                }
2110                writeMessageCallback:^{
2111                  writeMessageCount++;
2112                }]
2113                              callOptions:options];
2114    weakCall = call;
2115    [call start];
2116    [call receiveNextMessage];
2117    [call writeMessage:request];
2118    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
2119
2120    assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
2121    assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
2122    assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
2123    assertBlock(receiveNextMessageCount == 4,
2124                [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
2125    assertBlock(responseHeaderCount == 1,
2126                [NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
2127    assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
2128    assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
2129    assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
2130    assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
2131    globalInterceptorFactory.enabled = NO;
2132  });
2133}
2134
2135- (void)testConflictingGlobalInterceptors {
2136  id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
2137        initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
2138                    startHook:nil
2139                writeDataHook:nil
2140                   finishHook:nil
2141      receiveNextMessagesHook:nil
2142           responseHeaderHook:nil
2143             responseDataHook:nil
2144            responseCloseHook:nil
2145             didWriteDataHook:nil];
2146  @try {
2147    [GRPCCall2 registerGlobalInterceptor:factory];
2148    XCTFail(@"Did not receive an exception when registering global interceptor the second time");
2149  } @catch (NSException *exception) {
2150    // Do nothing; test passes
2151  }
2152}
2153
2154- (void)testInterceptorAndGlobalInterceptor {
2155  GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
2156    RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
2157    __weak XCTestExpectation *expectation =
2158        [self expectationWithDescription:@"testInterceptorAndGlobalInterceptor"];
2159
2160    __block NSUInteger startCount = 0;
2161    __block NSUInteger writeDataCount = 0;
2162    __block NSUInteger finishCount = 0;
2163    __block NSUInteger receiveNextMessageCount = 0;
2164    __block NSUInteger responseHeaderCount = 0;
2165    __block NSUInteger responseDataCount = 0;
2166    __block NSUInteger responseCloseCount = 0;
2167    __block NSUInteger didWriteDataCount = 0;
2168
2169    id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
2170        initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
2171        startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
2172                    GRPCInterceptorManager *manager) {
2173          startCount++;
2174          XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
2175          XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
2176          XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
2177          [manager startNextInterceptorWithRequest:[requestOptions copy]
2178                                       callOptions:[callOptions copy]];
2179        }
2180        writeDataHook:^(id data, GRPCInterceptorManager *manager) {
2181          writeDataCount++;
2182          [manager writeNextInterceptorWithData:data];
2183        }
2184        finishHook:^(GRPCInterceptorManager *manager) {
2185          finishCount++;
2186          [manager finishNextInterceptor];
2187        }
2188        receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
2189          receiveNextMessageCount++;
2190          [manager receiveNextInterceptorMessages:numberOfMessages];
2191        }
2192        responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
2193          responseHeaderCount++;
2194          [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
2195        }
2196        responseDataHook:^(id data, GRPCInterceptorManager *manager) {
2197          responseDataCount++;
2198          [manager forwardPreviousInterceptorWithData:data];
2199        }
2200        responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
2201                            GRPCInterceptorManager *manager) {
2202          responseCloseCount++;
2203          [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
2204                                                                 error:error];
2205        }
2206        didWriteDataHook:^(GRPCInterceptorManager *manager) {
2207          didWriteDataCount++;
2208          [manager forwardPreviousInterceptorDidWriteData];
2209        }];
2210
2211    __block NSUInteger globalStartCount = 0;
2212    __block NSUInteger globalWriteDataCount = 0;
2213    __block NSUInteger globalFinishCount = 0;
2214    __block NSUInteger globalReceiveNextMessageCount = 0;
2215    __block NSUInteger globalResponseHeaderCount = 0;
2216    __block NSUInteger globalResponseDataCount = 0;
2217    __block NSUInteger globalResponseCloseCount = 0;
2218    __block NSUInteger globalDidWriteDataCount = 0;
2219
2220    [globalInterceptorFactory
2221        setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
2222                       GRPCInterceptorManager *manager) {
2223          globalStartCount++;
2224          XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
2225          XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
2226          XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
2227          [manager startNextInterceptorWithRequest:[requestOptions copy]
2228                                       callOptions:[callOptions copy]];
2229        }
2230        writeDataHook:^(id data, GRPCInterceptorManager *manager) {
2231          globalWriteDataCount++;
2232          [manager writeNextInterceptorWithData:data];
2233        }
2234        finishHook:^(GRPCInterceptorManager *manager) {
2235          globalFinishCount++;
2236          [manager finishNextInterceptor];
2237        }
2238        receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
2239          globalReceiveNextMessageCount++;
2240          [manager receiveNextInterceptorMessages:numberOfMessages];
2241        }
2242        responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
2243          globalResponseHeaderCount++;
2244          [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
2245        }
2246        responseDataHook:^(id data, GRPCInterceptorManager *manager) {
2247          globalResponseDataCount++;
2248          [manager forwardPreviousInterceptorWithData:data];
2249        }
2250        responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
2251                            GRPCInterceptorManager *manager) {
2252          globalResponseCloseCount++;
2253          [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
2254                                                                 error:error];
2255        }
2256        didWriteDataHook:^(GRPCInterceptorManager *manager) {
2257          globalDidWriteDataCount++;
2258          [manager forwardPreviousInterceptorDidWriteData];
2259        }];
2260
2261    NSArray *requests = @[ @1, @2, @3, @4 ];
2262    NSArray *responses = @[ @1, @2, @3, @4 ];
2263
2264    __block int index = 0;
2265
2266    id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
2267                                                 requestedResponseSize:responses[index]];
2268    GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
2269    // For backwards compatibility
2270    options.transportType = [[self class] transportType];
2271    options.transport = [[self class] transport];
2272    options.PEMRootCertificates = [[self class] PEMRootCertificates];
2273    options.hostNameOverride = [[self class] hostNameOverride];
2274    options.flowControlEnabled = YES;
2275    options.interceptorFactories = @[ factory ];
2276    globalInterceptorFactory.enabled = YES;
2277
2278    __block int writeMessageCount = 0;
2279    __weak __block GRPCStreamingProtoCall *weakCall;
2280    GRPCStreamingProtoCall *call = [service
2281        fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
2282                                              initWithInitialMetadataCallback:nil
2283                                              messageCallback:^(id message) {
2284                                                GRPCStreamingProtoCall *localCall = weakCall;
2285                                                if (localCall == nil) {
2286                                                  return;
2287                                                }
2288                                                index += 1;
2289                                                if (index < 4) {
2290                                                  id request = [RMTStreamingOutputCallRequest
2291                                                      messageWithPayloadSize:requests[index]
2292                                                       requestedResponseSize:responses[index]];
2293                                                  [localCall writeMessage:request];
2294                                                  [localCall receiveNextMessage];
2295                                                } else {
2296                                                  [localCall finish];
2297                                                }
2298                                              }
2299                                              closeCallback:^(NSDictionary *trailingMetadata,
2300                                                              NSError *error) {
2301                                                if (weakCall == nil) {
2302                                                  return;
2303                                                }
2304                                                [expectation fulfill];
2305                                              }
2306                                              writeMessageCallback:^{
2307                                                writeMessageCount++;
2308                                              }]
2309                              callOptions:options];
2310    weakCall = call;
2311    [call start];
2312    [call receiveNextMessage];
2313    [call writeMessage:request];
2314
2315    waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
2316    assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
2317    assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
2318    assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
2319    assertBlock(receiveNextMessageCount == 4,
2320                [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
2321    assertBlock(responseHeaderCount == 1,
2322                [NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
2323    assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
2324    assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
2325    assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
2326    assertBlock(globalStartCount == 1, [NSString stringWithFormat:@"%@", @(globalStartCount)]);
2327    assertBlock(globalWriteDataCount == 4,
2328                [NSString stringWithFormat:@"%@", @(globalWriteDataCount)]);
2329    assertBlock(globalFinishCount == 1, [NSString stringWithFormat:@"%@", @(globalFinishCount)]);
2330    assertBlock(globalReceiveNextMessageCount == 4,
2331                [NSString stringWithFormat:@"%@", @(globalReceiveNextMessageCount)]);
2332    assertBlock(globalResponseHeaderCount == 1,
2333                [NSString stringWithFormat:@"%@", @(globalResponseHeaderCount)]);
2334    assertBlock(globalResponseDataCount == 4,
2335                [NSString stringWithFormat:@"%@", @(globalResponseDataCount)]);
2336    assertBlock(globalResponseCloseCount == 1,
2337                [NSString stringWithFormat:@"%@", @(globalResponseCloseCount)]);
2338    assertBlock(globalDidWriteDataCount == 4,
2339                [NSString stringWithFormat:@"%@", @(globalDidWriteDataCount)]);
2340    assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
2341    globalInterceptorFactory.enabled = NO;
2342  });
2343}
2344
2345@end
2346