1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "InteropTests.h" 20 21#include <grpc/status.h> 22 23#import <GRPCClient/GRPCCall+ChannelArg.h> 24#import <GRPCClient/GRPCCall+Cronet.h> 25#import <GRPCClient/GRPCCall+Interceptor.h> 26#import <GRPCClient/GRPCCall+Tests.h> 27#import <GRPCClient/GRPCInterceptor.h> 28#import <GRPCClient/internal_testing/GRPCCall+InternalTests.h> 29#import <ProtoRPC/ProtoRPC.h> 30#import <RxLibrary/GRXBufferedPipe.h> 31#import <RxLibrary/GRXWriter+Immediate.h> 32#import <grpc/grpc.h> 33#import <grpc/support/log.h> 34#import "src/objective-c/tests/RemoteTestClient/Messages.pbobjc.h" 35#import "src/objective-c/tests/RemoteTestClient/Test.pbobjc.h" 36#import "src/objective-c/tests/RemoteTestClient/Test.pbrpc.h" 37 38#import "../Common/TestUtils.h" 39#import "InteropTestsBlockCallbacks.h" 40 41#define SMALL_PAYLOAD_SIZE 10 42#define LARGE_REQUEST_PAYLOAD_SIZE 271828 43#define LARGE_RESPONSE_PAYLOAD_SIZE 314159 44 45static const int kTestRetries = 3; 46extern const char *kCFStreamVarName; 47 48@interface RMTStreamingOutputCallRequest (Constructors) 49+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize 50 requestedResponseSize:(NSNumber *)responseSize; 51@end 52 53@implementation RMTStreamingOutputCallRequest (Constructors) 54+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize 55 requestedResponseSize:(NSNumber *)responseSize { 56 RMTStreamingOutputCallRequest *request = [self message]; 57 RMTResponseParameters *parameters = [RMTResponseParameters message]; 58 parameters.size = responseSize.intValue; 59 [request.responseParametersArray addObject:parameters]; 60 request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue]; 61 return request; 62} 63@end 64 65@interface RMTStreamingOutputCallResponse (Constructors) 66+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize; 67@end 68 69@implementation RMTStreamingOutputCallResponse (Constructors) 70+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize { 71 RMTStreamingOutputCallResponse *response = [self message]; 72 response.payload.type = RMTPayloadType_Compressable; 73 response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue]; 74 return response; 75} 76@end 77 78BOOL isRemoteInteropTest(NSString *host) { 79 return [host isEqualToString:@"grpc-test.sandbox.googleapis.com"]; 80} 81 82@interface DefaultInterceptorFactory : NSObject <GRPCInterceptorFactory> 83 84- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager; 85 86@end 87 88@implementation DefaultInterceptorFactory 89 90- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager { 91 dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 92 return [[GRPCInterceptor alloc] initWithInterceptorManager:interceptorManager 93 dispatchQueue:queue]; 94} 95 96@end 97 98@interface HookInterceptorFactory : NSObject <GRPCInterceptorFactory> 99 100- (instancetype) 101 initWithDispatchQueue:(dispatch_queue_t)dispatchQueue 102 startHook:(void (^)(GRPCRequestOptions *requestOptions, 103 GRPCCallOptions *callOptions, 104 GRPCInterceptorManager *manager))startHook 105 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 106 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 107 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 108 GRPCInterceptorManager *manager))receiveNextMessagesHook 109 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 110 GRPCInterceptorManager *manager))responseHeaderHook 111 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 112 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 113 GRPCInterceptorManager *manager))responseCloseHook 114 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook; 115 116- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager; 117 118@end 119 120@interface HookInterceptor : GRPCInterceptor 121 122- (instancetype) 123 initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager 124 dispatchQueue:(dispatch_queue_t)dispatchQueue 125 startHook:(void (^)(GRPCRequestOptions *requestOptions, 126 GRPCCallOptions *callOptions, 127 GRPCInterceptorManager *manager))startHook 128 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 129 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 130 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 131 GRPCInterceptorManager *manager))receiveNextMessagesHook 132 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 133 GRPCInterceptorManager *manager))responseHeaderHook 134 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 135 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 136 GRPCInterceptorManager *manager))responseCloseHook 137 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook; 138 139@end 140 141@implementation HookInterceptorFactory { 142 @protected 143 void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 144 GRPCInterceptorManager *manager); 145 void (^_writeDataHook)(id data, GRPCInterceptorManager *manager); 146 void (^_finishHook)(GRPCInterceptorManager *manager); 147 void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager); 148 void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager); 149 void (^_responseDataHook)(id data, GRPCInterceptorManager *manager); 150 void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error, 151 GRPCInterceptorManager *manager); 152 void (^_didWriteDataHook)(GRPCInterceptorManager *manager); 153 dispatch_queue_t _dispatchQueue; 154} 155 156- (instancetype) 157 initWithDispatchQueue:(dispatch_queue_t)dispatchQueue 158 startHook:(void (^)(GRPCRequestOptions *requestOptions, 159 GRPCCallOptions *callOptions, 160 GRPCInterceptorManager *manager))startHook 161 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 162 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 163 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 164 GRPCInterceptorManager *manager))receiveNextMessagesHook 165 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 166 GRPCInterceptorManager *manager))responseHeaderHook 167 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 168 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 169 GRPCInterceptorManager *manager))responseCloseHook 170 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook { 171 if ((self = [super init])) { 172 _dispatchQueue = dispatchQueue; 173 _startHook = startHook; 174 _writeDataHook = writeDataHook; 175 _finishHook = finishHook; 176 _receiveNextMessagesHook = receiveNextMessagesHook; 177 _responseHeaderHook = responseHeaderHook; 178 _responseDataHook = responseDataHook; 179 _responseCloseHook = responseCloseHook; 180 _didWriteDataHook = didWriteDataHook; 181 } 182 return self; 183} 184 185- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager { 186 return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager 187 dispatchQueue:_dispatchQueue 188 startHook:_startHook 189 writeDataHook:_writeDataHook 190 finishHook:_finishHook 191 receiveNextMessagesHook:_receiveNextMessagesHook 192 responseHeaderHook:_responseHeaderHook 193 responseDataHook:_responseDataHook 194 responseCloseHook:_responseCloseHook 195 didWriteDataHook:_didWriteDataHook]; 196} 197 198@end 199 200@implementation HookInterceptor { 201 void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 202 GRPCInterceptorManager *manager); 203 void (^_writeDataHook)(id data, GRPCInterceptorManager *manager); 204 void (^_finishHook)(GRPCInterceptorManager *manager); 205 void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager); 206 void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager); 207 void (^_responseDataHook)(id data, GRPCInterceptorManager *manager); 208 void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error, 209 GRPCInterceptorManager *manager); 210 void (^_didWriteDataHook)(GRPCInterceptorManager *manager); 211 GRPCInterceptorManager *_manager; 212 dispatch_queue_t _dispatchQueue; 213} 214 215- (dispatch_queue_t)dispatchQueue { 216 return _dispatchQueue; 217} 218 219- (instancetype) 220 initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager 221 dispatchQueue:(dispatch_queue_t)dispatchQueue 222 startHook:(void (^)(GRPCRequestOptions *requestOptions, 223 GRPCCallOptions *callOptions, 224 GRPCInterceptorManager *manager))startHook 225 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 226 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 227 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 228 GRPCInterceptorManager *manager))receiveNextMessagesHook 229 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 230 GRPCInterceptorManager *manager))responseHeaderHook 231 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 232 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 233 GRPCInterceptorManager *manager))responseCloseHook 234 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook { 235 if ((self = [super initWithInterceptorManager:interceptorManager dispatchQueue:dispatchQueue])) { 236 _startHook = startHook; 237 _writeDataHook = writeDataHook; 238 _finishHook = finishHook; 239 _receiveNextMessagesHook = receiveNextMessagesHook; 240 _responseHeaderHook = responseHeaderHook; 241 _responseDataHook = responseDataHook; 242 _responseCloseHook = responseCloseHook; 243 _didWriteDataHook = didWriteDataHook; 244 _dispatchQueue = dispatchQueue; 245 _manager = interceptorManager; 246 } 247 return self; 248} 249 250- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions 251 callOptions:(GRPCCallOptions *)callOptions { 252 if (_startHook) { 253 _startHook(requestOptions, callOptions, _manager); 254 } 255} 256 257- (void)writeData:(id)data { 258 if (_writeDataHook) { 259 _writeDataHook(data, _manager); 260 } 261} 262 263- (void)finish { 264 if (_finishHook) { 265 _finishHook(_manager); 266 } 267} 268 269- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 270 if (_receiveNextMessagesHook) { 271 _receiveNextMessagesHook(numberOfMessages, _manager); 272 } 273} 274 275- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { 276 if (_responseHeaderHook) { 277 _responseHeaderHook(initialMetadata, _manager); 278 } 279} 280 281- (void)didReceiveData:(id)data { 282 if (_responseDataHook) { 283 _responseDataHook(data, _manager); 284 } 285} 286 287- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 288 if (_responseCloseHook) { 289 _responseCloseHook(trailingMetadata, error, _manager); 290 } 291} 292 293- (void)didWriteData { 294 if (_didWriteDataHook) { 295 _didWriteDataHook(_manager); 296 } 297} 298 299@end 300 301@interface GlobalInterceptorFactory : HookInterceptorFactory 302 303@property BOOL enabled; 304 305- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue; 306 307- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 308 GRPCInterceptorManager *manager))startHook 309 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 310 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 311 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 312 GRPCInterceptorManager *manager))receiveNextMessagesHook 313 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 314 GRPCInterceptorManager *manager))responseHeaderHook 315 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 316 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 317 GRPCInterceptorManager *manager))responseCloseHook 318 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook; 319 320@end 321 322@implementation GlobalInterceptorFactory 323 324- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue { 325 _enabled = NO; 326 return [super initWithDispatchQueue:dispatchQueue 327 startHook:nil 328 writeDataHook:nil 329 finishHook:nil 330 receiveNextMessagesHook:nil 331 responseHeaderHook:nil 332 responseDataHook:nil 333 responseCloseHook:nil 334 didWriteDataHook:nil]; 335} 336 337- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager { 338 if (_enabled) { 339 return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager 340 dispatchQueue:_dispatchQueue 341 startHook:_startHook 342 writeDataHook:_writeDataHook 343 finishHook:_finishHook 344 receiveNextMessagesHook:_receiveNextMessagesHook 345 responseHeaderHook:_responseHeaderHook 346 responseDataHook:_responseDataHook 347 responseCloseHook:_responseCloseHook 348 didWriteDataHook:_didWriteDataHook]; 349 } else { 350 return nil; 351 } 352} 353 354- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 355 GRPCInterceptorManager *manager))startHook 356 writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook 357 finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook 358 receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, 359 GRPCInterceptorManager *manager))receiveNextMessagesHook 360 responseHeaderHook:(void (^)(NSDictionary *initialMetadata, 361 GRPCInterceptorManager *manager))responseHeaderHook 362 responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook 363 responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, 364 GRPCInterceptorManager *manager))responseCloseHook 365 didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook { 366 _startHook = startHook; 367 _writeDataHook = writeDataHook; 368 _finishHook = finishHook; 369 _receiveNextMessagesHook = receiveNextMessagesHook; 370 _responseHeaderHook = responseHeaderHook; 371 _responseDataHook = responseDataHook; 372 _responseCloseHook = responseCloseHook; 373 _didWriteDataHook = didWriteDataHook; 374} 375 376@end 377 378static GlobalInterceptorFactory *globalInterceptorFactory = nil; 379static dispatch_once_t initGlobalInterceptorFactory; 380 381#pragma mark Tests 382 383@implementation InteropTests 384 385#pragma clang diagnostic push 386#pragma clang diagnostic ignored "-Warc-performSelector-leaks" 387- (void)retriableTest:(SEL)selector retries:(int)retries timeout:(NSTimeInterval)timeout { 388 for (int i = 0; i < retries; i++) { 389 NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:timeout]; 390 NSCondition *cv = [[NSCondition alloc] init]; 391 __block BOOL done = NO; 392 [cv lock]; 393 dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0), ^{ 394 [self performSelector:selector]; 395 [cv lock]; 396 done = YES; 397 [cv signal]; 398 [cv unlock]; 399 }); 400 while (!done && [waitUntil timeIntervalSinceNow] > 0) { 401 [cv waitUntilDate:waitUntil]; 402 } 403 if (done) { 404 [cv unlock]; 405 break; 406 } else { 407 [cv unlock]; 408 [self tearDown]; 409 [self setUp]; 410 } 411 } 412} 413#pragma clang diagnostic pop 414 415+ (XCTestSuite *)defaultTestSuite { 416 if (self == [InteropTests class]) { 417 return [XCTestSuite testSuiteWithName:@"InteropTestsEmptySuite"]; 418 } 419 return super.defaultTestSuite; 420} 421 422+ (NSString *)host { 423 return nil; 424} 425 426// This number indicates how many bytes of overhead does Protocol Buffers encoding add onto the 427// message. The number varies as different message.proto is used on different servers. The actual 428// number for each interop server is overridden in corresponding derived test classes. 429- (int32_t)encodingOverhead { 430 return 0; 431} 432 433// For backwards compatibility 434+ (GRPCTransportType)transportType { 435 return GRPCTransportTypeChttp2BoringSSL; 436} 437 438+ (GRPCTransportID)transport { 439 return NULL; 440} 441 442+ (NSString *)PEMRootCertificates { 443 return nil; 444} 445 446+ (NSString *)hostNameOverride { 447 return nil; 448} 449 450+ (void)setUp { 451 dispatch_once(&initGlobalInterceptorFactory, ^{ 452 dispatch_queue_t globalInterceptorQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 453 globalInterceptorFactory = 454 [[GlobalInterceptorFactory alloc] initWithDispatchQueue:globalInterceptorQueue]; 455 [GRPCCall2 registerGlobalInterceptor:globalInterceptorFactory]; 456 }); 457} 458 459- (void)setUp { 460 self.continueAfterFailure = YES; 461 [GRPCCall resetHostSettings]; 462 GRPCResetCallConnections(); 463 XCTAssertNotNil([[self class] host]); 464} 465 466- (void)testEmptyUnaryRPC { 467 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiter, GRPCTestAssert assertBlock) { 468 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 469 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"]; 470 471 GPBEmpty *request = [GPBEmpty message]; 472 473 __weak RMTTestService *weakService = service; 474 [service emptyCallWithRequest:request 475 handler:^(GPBEmpty *response, NSError *error) { 476 if (weakService == nil) { 477 return; 478 } 479 480 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 481 482 id expectedResponse = [GPBEmpty message]; 483 XCTAssertEqualObjects(response, expectedResponse); 484 485 [expectation fulfill]; 486 }]; 487 waiter(@[ expectation ], GRPCInteropTestTimeoutDefault); 488 }); 489} 490 491- (void)testEmptyUnaryRPCWithV2API { 492 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 493 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 494 __weak XCTestExpectation *expectReceive = 495 [self expectationWithDescription:@"EmptyUnaryWithV2API received message"]; 496 __weak XCTestExpectation *expectComplete = 497 [self expectationWithDescription:@"EmptyUnaryWithV2API completed"]; 498 499 GPBEmpty *request = [GPBEmpty message]; 500 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 501 // For backwards compatibility 502 options.transportType = [[self class] transportType]; 503 options.transport = [[self class] transport]; 504 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 505 options.hostNameOverride = [[self class] hostNameOverride]; 506 507 __weak RMTTestService *weakService = service; 508 GRPCUnaryProtoCall *call = [service 509 emptyCallWithMessage:request 510 responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 511 messageCallback:^(id message) { 512 if (weakService == nil) { 513 return; 514 } 515 if (message) { 516 id expectedResponse = [GPBEmpty message]; 517 XCTAssertEqualObjects(message, expectedResponse); 518 [expectReceive fulfill]; 519 } 520 } 521 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 522 if (weakService == nil) { 523 return; 524 } 525 XCTAssertNil(error, @"Unexpected error: %@", error); 526 [expectComplete fulfill]; 527 }] 528 callOptions:options]; 529 [call start]; 530 waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault); 531 }); 532} 533 534// Test that responses can be dispatched even if we do not run main run-loop 535- (void)testAsyncDispatchWithV2API { 536 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 537 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 538 539 XCTestExpectation *receiveExpect = [self expectationWithDescription:@"receiveExpect"]; 540 XCTestExpectation *closeExpect = [self expectationWithDescription:@"closeExpect"]; 541 542 GPBEmpty *request = [GPBEmpty message]; 543 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 544 // For backwards compatibility 545 options.transportType = [[self class] transportType]; 546 options.transport = [[self class] transport]; 547 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 548 options.hostNameOverride = [[self class] hostNameOverride]; 549 550 __block BOOL messageReceived = NO; 551 __block BOOL done = NO; 552 __weak RMTTestService *weakService = service; 553 GRPCUnaryProtoCall *call = [service 554 emptyCallWithMessage:request 555 responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 556 messageCallback:^(id message) { 557 if (weakService == nil) { 558 return; 559 } 560 if (message) { 561 id expectedResponse = [GPBEmpty message]; 562 XCTAssertEqualObjects(message, expectedResponse); 563 messageReceived = YES; 564 } 565 [receiveExpect fulfill]; 566 } 567 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 568 if (weakService == nil) { 569 return; 570 } 571 XCTAssertNil(error, @"Unexpected error: %@", error); 572 done = YES; 573 [closeExpect fulfill]; 574 }] 575 callOptions:options]; 576 577 [call start]; 578 579 waiterBlock(@[ receiveExpect, closeExpect ], GRPCInteropTestTimeoutDefault); 580 XCTAssertTrue(messageReceived); 581 XCTAssertTrue(done); 582 }); 583} 584 585- (void)testLargeUnaryRPC { 586 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 587 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 588 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"]; 589 590 RMTSimpleRequest *request = [RMTSimpleRequest message]; 591 request.responseType = RMTPayloadType_Compressable; 592 request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE; 593 request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE]; 594 595 __weak RMTTestService *weakService = service; 596 [service unaryCallWithRequest:request 597 handler:^(RMTSimpleResponse *response, NSError *error) { 598 if (weakService == nil) { 599 return; 600 } 601 602 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 603 604 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 605 expectedResponse.payload.type = RMTPayloadType_Compressable; 606 expectedResponse.payload.body = 607 [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE]; 608 XCTAssertEqualObjects(response, expectedResponse); 609 610 [expectation fulfill]; 611 }]; 612 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 613 }); 614} 615 616- (void)testUnaryResponseHandler { 617 // The test does not work on a remote server since it does not echo a trailer 618 if ([[self class] isRemoteTest]) { 619 return; 620 } 621 622 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 623 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 624 625 XCTestExpectation *expectComplete = [self expectationWithDescription:@"call complete"]; 626 XCTestExpectation *expectCompleteMainQueue = 627 [self expectationWithDescription:@"main queue call complete"]; 628 629 RMTSimpleRequest *request = [RMTSimpleRequest message]; 630 request.responseType = RMTPayloadType_Compressable; 631 request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE; 632 request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE]; 633 634 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 635 // For backwards compatibility 636 options.transportType = [[self class] transportType]; 637 options.transport = [[self class] transport]; 638 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 639 options.hostNameOverride = [[self class] hostNameOverride]; 640 const unsigned char raw_bytes[] = {1, 2, 3, 4}; 641 NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)]; 642 options.initialMetadata = @{ 643 @"x-grpc-test-echo-trailing-bin" : trailer_data, 644 @"x-grpc-test-echo-initial" : @"test-header" 645 }; 646 647 __weak RMTTestService *weakService = service; 648 649 __block GRPCUnaryResponseHandler *handler = [[GRPCUnaryResponseHandler alloc] 650 initWithResponseHandler:^(GPBMessage *response, NSError *error) { 651 if (weakService == nil) { 652 return; 653 } 654 655 XCTAssertNil(error, @"Unexpected error: %@", error); 656 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 657 expectedResponse.payload.type = RMTPayloadType_Compressable; 658 expectedResponse.payload.body = 659 [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE]; 660 XCTAssertEqualObjects(response, expectedResponse); 661 XCTAssertEqualObjects(handler.responseHeaders[@"x-grpc-test-echo-initial"], 662 @"test-header"); 663 XCTAssertEqualObjects(handler.responseTrailers[@"x-grpc-test-echo-trailing-bin"], 664 trailer_data); 665 [expectComplete fulfill]; 666 } 667 responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)]; 668 __block GRPCUnaryResponseHandler *handlerMainQueue = [[GRPCUnaryResponseHandler alloc] 669 initWithResponseHandler:^(GPBMessage *response, NSError *error) { 670 if (weakService == nil) { 671 return; 672 } 673 XCTAssertNil(error, @"Unexpected error: %@", error); 674 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 675 expectedResponse.payload.type = RMTPayloadType_Compressable; 676 expectedResponse.payload.body = 677 [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE]; 678 XCTAssertEqualObjects(response, expectedResponse); 679 XCTAssertEqualObjects(handlerMainQueue.responseHeaders[@"x-grpc-test-echo-initial"], 680 @"test-header"); 681 XCTAssertEqualObjects(handlerMainQueue.responseTrailers[@"x-grpc-test-echo-trailing-bin"], 682 trailer_data); 683 [expectCompleteMainQueue fulfill]; 684 } 685 responseDispatchQueue:nil]; 686 687 [[service unaryCallWithMessage:request responseHandler:handler callOptions:options] start]; 688 [[service unaryCallWithMessage:request responseHandler:handlerMainQueue 689 callOptions:options] start]; 690 691 waiterBlock(@[ expectComplete, expectCompleteMainQueue ], GRPCInteropTestTimeoutDefault); 692 }); 693} 694 695- (void)testLargeUnaryRPCWithV2API { 696 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 697 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 698 __weak XCTestExpectation *expectReceive = 699 [self expectationWithDescription:@"LargeUnaryWithV2API received message"]; 700 __weak XCTestExpectation *expectComplete = 701 [self expectationWithDescription:@"LargeUnaryWithV2API received complete"]; 702 703 RMTSimpleRequest *request = [RMTSimpleRequest message]; 704 request.responseType = RMTPayloadType_Compressable; 705 request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE; 706 request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE]; 707 708 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 709 // For backwards compatibility 710 options.transportType = [[self class] transportType]; 711 options.transport = [[self class] transport]; 712 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 713 options.hostNameOverride = [[self class] hostNameOverride]; 714 715 __weak RMTTestService *weakService = service; 716 GRPCUnaryProtoCall *call = [service 717 unaryCallWithMessage:request 718 responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 719 messageCallback:^(id message) { 720 if (weakService == nil) { 721 return; 722 } 723 XCTAssertNotNil(message); 724 if (message) { 725 RMTSimpleResponse *expectedResponse = 726 [RMTSimpleResponse message]; 727 expectedResponse.payload.type = RMTPayloadType_Compressable; 728 expectedResponse.payload.body = 729 [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE]; 730 XCTAssertEqualObjects(message, expectedResponse); 731 732 [expectReceive fulfill]; 733 } 734 } 735 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 736 if (weakService == nil) { 737 return; 738 } 739 XCTAssertNil(error, @"Unexpected error: %@", error); 740 [expectComplete fulfill]; 741 }] 742 callOptions:options]; 743 [call start]; 744 waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault); 745 }); 746} 747 748- (void)testConcurrentRPCsWithErrorsWithV2API { 749 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 750 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 751 NSMutableArray *completeExpectations = [NSMutableArray array]; 752 NSMutableArray *calls = [NSMutableArray array]; 753 int num_rpcs = 10; 754 for (int i = 0; i < num_rpcs; ++i) { 755 [completeExpectations 756 addObject:[self expectationWithDescription: 757 [NSString stringWithFormat:@"Received trailer for RPC %d", i]]]; 758 759 RMTSimpleRequest *request = [RMTSimpleRequest message]; 760 request.responseType = RMTPayloadType_Compressable; 761 request.responseSize = SMALL_PAYLOAD_SIZE; 762 request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE]; 763 if (i % 3 == 0) { 764 request.responseStatus.code = GRPC_STATUS_UNAVAILABLE; 765 } else if (i % 7 == 0) { 766 request.responseStatus.code = GRPC_STATUS_CANCELLED; 767 } 768 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 769 // For backwards compatibility 770 options.transportType = [[self class] transportType]; 771 options.transport = [[self class] transport]; 772 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 773 options.hostNameOverride = [[self class] hostNameOverride]; 774 775 __weak RMTTestService *weakService = service; 776 GRPCUnaryProtoCall *call = [service 777 unaryCallWithMessage:request 778 responseHandler:[[InteropTestsBlockCallbacks alloc] 779 initWithInitialMetadataCallback:nil 780 messageCallback:^(id message) { 781 if (weakService == nil) { 782 return; 783 } 784 if (message) { 785 RMTSimpleResponse *expectedResponse = 786 [RMTSimpleResponse message]; 787 expectedResponse.payload.type = RMTPayloadType_Compressable; 788 expectedResponse.payload.body = 789 [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE]; 790 XCTAssertEqualObjects(message, expectedResponse); 791 } 792 } 793 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 794 if (weakService == nil) { 795 return; 796 } 797 [completeExpectations[i] fulfill]; 798 }] 799 callOptions:options]; 800 [calls addObject:call]; 801 } 802 803 for (int i = 0; i < num_rpcs; ++i) { 804 GRPCUnaryProtoCall *call = calls[i]; 805 [call start]; 806 } 807 808 waiterBlock(completeExpectations, GRPCInteropTestTimeoutDefault); 809 }); 810} 811 812- (void)concurrentRPCsWithErrors { 813 const int kNumRpcs = 10; 814 __block int completedCallCount = 0; 815 NSCondition *cv = [[NSCondition alloc] init]; 816 NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:GRPCInteropTestTimeoutDefault]; 817 [cv lock]; 818 for (int i = 0; i < kNumRpcs; ++i) { 819 RMTSimpleRequest *request = [RMTSimpleRequest message]; 820 request.responseType = RMTPayloadType_Compressable; 821 request.responseSize = SMALL_PAYLOAD_SIZE; 822 request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE]; 823 if (i % 3 == 0) { 824 request.responseStatus.code = GRPC_STATUS_UNAVAILABLE; 825 } else if (i % 7 == 0) { 826 request.responseStatus.code = GRPC_STATUS_CANCELLED; 827 } 828 829 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 830 __weak RMTTestService *weakService = service; 831 GRPCProtoCall *call = [service 832 RPCToUnaryCallWithRequest:request 833 handler:^(RMTSimpleResponse *response, NSError *error) { 834 if (weakService == nil) { 835 return; 836 } 837 if (error == nil) { 838 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 839 expectedResponse.payload.type = RMTPayloadType_Compressable; 840 expectedResponse.payload.body = 841 [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE]; 842 XCTAssertEqualObjects(response, expectedResponse); 843 } 844 // DEBUG 845 [cv lock]; 846 if (++completedCallCount == kNumRpcs) { 847 [cv signal]; 848 } 849 [cv unlock]; 850 }]; 851 [call setResponseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)]; 852 [call start]; 853 } 854 while (completedCallCount < kNumRpcs && [waitUntil timeIntervalSinceNow] > 0) { 855 [cv waitUntilDate:waitUntil]; 856 } 857 [cv unlock]; 858} 859 860- (void)testConcurrentRPCsWithErrors { 861 [self retriableTest:@selector(concurrentRPCsWithErrors) retries:kTestRetries timeout:10]; 862} 863 864- (void)testPacketCoalescing { 865 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 866 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 867 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"]; 868 869 RMTSimpleRequest *request = [RMTSimpleRequest message]; 870 request.responseType = RMTPayloadType_Compressable; 871 request.responseSize = SMALL_PAYLOAD_SIZE; 872 request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE]; 873 874 [GRPCCall enableOpBatchLog:YES]; 875 __weak RMTTestService *weakService = service; 876 [service unaryCallWithRequest:request 877 handler:^(RMTSimpleResponse *response, NSError *error) { 878 if (weakService == nil) { 879 return; 880 } 881 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 882 883 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 884 expectedResponse.payload.type = RMTPayloadType_Compressable; 885 expectedResponse.payload.body = [NSMutableData dataWithLength:10]; 886 XCTAssertEqualObjects(response, expectedResponse); 887 888 // The test is a success if there is a batch of exactly 3 ops 889 // (SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT). 890 // Without packet coalescing each batch of ops contains only one op. 891 NSArray *opBatches = [GRPCCall obtainAndCleanOpBatchLog]; 892 const NSInteger kExpectedOpBatchSize = 3; 893 for (NSObject *o in opBatches) { 894 if ([o isKindOfClass:[NSArray class]]) { 895 NSArray *batch = (NSArray *)o; 896 if ([batch count] == kExpectedOpBatchSize) { 897 [expectation fulfill]; 898 break; 899 } 900 } 901 } 902 }]; 903 904 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 905 [GRPCCall enableOpBatchLog:NO]; 906 }); 907} 908 909- (void)test4MBResponsesAreAccepted { 910 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 911 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 912 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"]; 913 914 RMTSimpleRequest *request = [RMTSimpleRequest message]; 915 const int32_t kPayloadSize = 916 4 * 1024 * 1024 - self.encodingOverhead; // 4MB - encoding overhead 917 request.responseSize = kPayloadSize; 918 919 __weak RMTTestService *weakService = service; 920 [service unaryCallWithRequest:request 921 handler:^(RMTSimpleResponse *response, NSError *error) { 922 if (weakService == nil) { 923 return; 924 } 925 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 926 XCTAssertEqual(response.payload.body.length, kPayloadSize); 927 [expectation fulfill]; 928 }]; 929 930 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 931 }); 932} 933 934- (void)testResponsesOverMaxSizeFailWithActionableMessage { 935 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 936 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 937 __weak XCTestExpectation *expectation = 938 [self expectationWithDescription:@"ResponseOverMaxSize"]; 939 940 RMTSimpleRequest *request = [RMTSimpleRequest message]; 941 const int32_t kPayloadSize = 4 * 1024 * 1024 - self.encodingOverhead + 1; // 1B over max size 942 request.responseSize = kPayloadSize; 943 944 __weak RMTTestService *weakService = service; 945 [service unaryCallWithRequest:request 946 handler:^(RMTSimpleResponse *response, NSError *error) { 947 if (weakService == nil) { 948 return; 949 } 950 // TODO(jcanizales): Catch the error and rethrow it with an 951 // actionable message: 952 // - Use +[GRPCCall setResponseSizeLimit:forHost:] to set a 953 // higher limit. 954 // - If you're developing the server, consider using response 955 // streaming, or let clients filter 956 // responses by setting a google.protobuf.FieldMask in the 957 // request: 958 // https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto 959 XCTAssertEqualObjects( 960 error.localizedDescription, 961 @"CLIENT: Received message larger than max (4194305 vs. 4194304)"); 962 [expectation fulfill]; 963 }]; 964 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 965 }); 966} 967 968- (void)testResponsesOver4MBAreAcceptedIfOptedIn { 969 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 970 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 971 __weak XCTestExpectation *expectation = 972 [self expectationWithDescription:@"HigherResponseSizeLimit"]; 973 __block NSError *callError = nil; 974 975 RMTSimpleRequest *request = [RMTSimpleRequest message]; 976 const size_t kPayloadSize = 5 * 1024 * 1024; // 5MB 977 request.responseSize = kPayloadSize; 978 979 [GRPCCall setResponseSizeLimit:6 * 1024 * 1024 forHost:[[self class] host]]; 980 __weak RMTTestService *weakService = service; 981 [service unaryCallWithRequest:request 982 handler:^(RMTSimpleResponse *response, NSError *error) { 983 if (weakService == nil) { 984 return; 985 } 986 callError = error; 987 [expectation fulfill]; 988 }]; 989 990 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 991 XCTAssertNil(callError, @"Finished with unexpected error: %@", callError); 992 }); 993} 994 995- (void)testClientStreamingRPC { 996 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 997 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 998 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ClientStreaming"]; 999 1000 RMTStreamingInputCallRequest *request1 = [RMTStreamingInputCallRequest message]; 1001 request1.payload.body = [NSMutableData dataWithLength:27182]; 1002 1003 RMTStreamingInputCallRequest *request2 = [RMTStreamingInputCallRequest message]; 1004 request2.payload.body = [NSMutableData dataWithLength:8]; 1005 1006 RMTStreamingInputCallRequest *request3 = [RMTStreamingInputCallRequest message]; 1007 request3.payload.body = [NSMutableData dataWithLength:1828]; 1008 1009 RMTStreamingInputCallRequest *request4 = [RMTStreamingInputCallRequest message]; 1010 request4.payload.body = [NSMutableData dataWithLength:45904]; 1011 1012 GRXWriter *writer = [GRXWriter writerWithContainer:@[ request1, request2, request3, request4 ]]; 1013 1014 __weak RMTTestService *weakService = service; 1015 [service 1016 streamingInputCallWithRequestsWriter:writer 1017 handler:^(RMTStreamingInputCallResponse *response, 1018 NSError *error) { 1019 if (weakService == nil) { 1020 return; 1021 } 1022 XCTAssertNil(error, @"Finished with unexpected error: %@", 1023 error); 1024 1025 RMTStreamingInputCallResponse *expectedResponse = 1026 [RMTStreamingInputCallResponse message]; 1027 expectedResponse.aggregatedPayloadSize = 74922; 1028 XCTAssertEqualObjects(response, expectedResponse); 1029 1030 [expectation fulfill]; 1031 }]; 1032 1033 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1034 }); 1035} 1036 1037- (void)testServerStreamingRPC { 1038 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1039 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1040 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ServerStreaming"]; 1041 1042 NSArray *expectedSizes = @[ @31415, @9, @2653, @58979 ]; 1043 1044 RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; 1045 for (NSNumber *size in expectedSizes) { 1046 RMTResponseParameters *parameters = [RMTResponseParameters message]; 1047 parameters.size = [size intValue]; 1048 [request.responseParametersArray addObject:parameters]; 1049 } 1050 1051 __block int index = 0; 1052 __weak RMTTestService *weakService = service; 1053 [service 1054 streamingOutputCallWithRequest:request 1055 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response, 1056 NSError *error) { 1057 if (weakService == nil) { 1058 return; 1059 } 1060 1061 assertBlock( 1062 error == nil, 1063 [NSString 1064 stringWithFormat:@"Finished with unexpected error: %@", error]); 1065 assertBlock(done || response, 1066 @"Event handler called without an event."); 1067 1068 if (response) { 1069 assertBlock(index < 4, @"More than 4 responses received."); 1070 1071 id expected = [RMTStreamingOutputCallResponse 1072 messageWithPayloadSize:expectedSizes[index]]; 1073 assertBlock( 1074 [response isEqual:expected], 1075 [NSString 1076 stringWithFormat:@"response %@ not equal to expected %@", 1077 response, expected]); 1078 1079 index += 1; 1080 } 1081 1082 if (done) { 1083 assertBlock( 1084 index == 4, 1085 [NSString stringWithFormat:@"Received %@ responses instead of 4.", 1086 @(index)]); 1087 [expectation fulfill]; 1088 } 1089 }]; 1090 1091 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1092 }); 1093} 1094 1095- (void)testPingPongRPC { 1096 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1097 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1098 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"]; 1099 1100 NSArray *requests = @[ @27182, @8, @1828, @45904 ]; 1101 NSArray *responses = @[ @31415, @9, @2653, @58979 ]; 1102 1103 GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; 1104 1105 __block int index = 0; 1106 1107 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1108 requestedResponseSize:responses[index]]; 1109 [requestsBuffer writeValue:request]; 1110 1111 __weak RMTTestService *weakService = service; 1112 [service 1113 fullDuplexCallWithRequestsWriter:requestsBuffer 1114 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response, 1115 NSError *error) { 1116 if (weakService == nil) { 1117 return; 1118 } 1119 1120 assertBlock( 1121 error == nil, 1122 [NSString stringWithFormat:@"Finished with unexpected error: %@", 1123 error]); 1124 assertBlock(done || response, 1125 @"Event handler called without an event."); 1126 1127 if (response) { 1128 assertBlock(index < 4, @"More than 4 responses received."); 1129 1130 id expected = [RMTStreamingOutputCallResponse 1131 messageWithPayloadSize:responses[index]]; 1132 assertBlock( 1133 [response isEqual:expected], 1134 [NSString 1135 stringWithFormat:@"response %@ not equal to expected %@", 1136 response, expected]); 1137 1138 index += 1; 1139 if (index < 4) { 1140 id request = [RMTStreamingOutputCallRequest 1141 messageWithPayloadSize:requests[index] 1142 requestedResponseSize:responses[index]]; 1143 [requestsBuffer writeValue:request]; 1144 } else { 1145 [requestsBuffer writesFinishedWithError:nil]; 1146 } 1147 } 1148 1149 if (done) { 1150 assertBlock( 1151 index == 4, 1152 [NSString 1153 stringWithFormat:@"Received %@ responses instead of 4.", 1154 @(index)]); 1155 [expectation fulfill]; 1156 } 1157 }]; 1158 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1159 }); 1160} 1161 1162- (void)testPingPongRPCWithV2API { 1163 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1164 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1165 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; 1166 1167 NSArray *requests = @[ @27182, @8, @1828, @45904 ]; 1168 NSArray *responses = @[ @31415, @9, @2653, @58979 ]; 1169 1170 __block int index = 0; 1171 1172 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1173 requestedResponseSize:responses[index]]; 1174 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1175 // For backwards compatibility 1176 options.transportType = [[self class] transportType]; 1177 options.transport = [[self class] transport]; 1178 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1179 options.hostNameOverride = [[self class] hostNameOverride]; 1180 1181 __weak __block GRPCStreamingProtoCall *weakCall; 1182 GRPCStreamingProtoCall *call = [service 1183 fullDuplexCallWithResponseHandler: 1184 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 1185 messageCallback:^(id message) { 1186 GRPCStreamingProtoCall *localCall = weakCall; 1187 if (localCall == nil) { 1188 return; 1189 } 1190 assertBlock(index < 4, @"More than 4 responses received."); 1191 1192 id expected = 1193 [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]]; 1194 assertBlock([message isEqual:expected], 1195 [NSString stringWithFormat:@"message %@ not equal to expected %@", 1196 message, expected]); 1197 index += 1; 1198 if (index < 4) { 1199 id request = 1200 [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1201 requestedResponseSize:responses[index]]; 1202 [localCall writeMessage:request]; 1203 } else { 1204 [localCall finish]; 1205 } 1206 } 1207 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1208 if (weakCall == nil) { 1209 return; 1210 } 1211 assertBlock( 1212 error == nil, 1213 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 1214 assertBlock( 1215 index == 4, 1216 [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]); 1217 [expectation fulfill]; 1218 }] 1219 callOptions:options]; 1220 weakCall = call; 1221 [call start]; 1222 [call writeMessage:request]; 1223 1224 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1225 }); 1226} 1227 1228- (void)testPingPongRPCWithFlowControl { 1229 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1230 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1231 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; 1232 1233 NSArray *requests = @[ @27182, @8, @1828, @45904 ]; 1234 NSArray *responses = @[ @31415, @9, @2653, @58979 ]; 1235 1236 __block int index = 0; 1237 1238 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1239 requestedResponseSize:responses[index]]; 1240 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1241 // For backwards compatibility 1242 options.transportType = [[self class] transportType]; 1243 options.transport = [[self class] transport]; 1244 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1245 options.hostNameOverride = [[self class] hostNameOverride]; 1246 options.flowControlEnabled = YES; 1247 __block int writeMessageCount = 0; 1248 1249 __weak __block GRPCStreamingProtoCall *weakCall; 1250 GRPCStreamingProtoCall *call = [service 1251 fullDuplexCallWithResponseHandler: 1252 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 1253 messageCallback:^(id message) { 1254 GRPCStreamingProtoCall *localCall = weakCall; 1255 if (localCall == nil) { 1256 return; 1257 } 1258 1259 assertBlock((index < 4), @"More than 4 responses received."); 1260 id expected = 1261 [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]]; 1262 assertBlock( 1263 [message isEqual:expected], 1264 [NSString stringWithFormat:@"message %@ not equal to %@", message, expected]); 1265 1266 index += 1; 1267 if (index < 4) { 1268 id request = 1269 [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1270 requestedResponseSize:responses[index]]; 1271 [localCall writeMessage:request]; 1272 [localCall receiveNextMessage]; 1273 } else { 1274 [localCall finish]; 1275 } 1276 } 1277 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1278 if (weakCall == nil) { 1279 return; 1280 } 1281 1282 assertBlock( 1283 error == nil, 1284 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 1285 assertBlock( 1286 index == 4, 1287 [NSString stringWithFormat:@"Received %i responses instead of 4.", index]); 1288 [expectation fulfill]; 1289 } 1290 writeMessageCallback:^{ 1291 writeMessageCount++; 1292 }] 1293 callOptions:options]; 1294 weakCall = call; 1295 [call start]; 1296 [call receiveNextMessage]; 1297 [call writeMessage:request]; 1298 1299 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1300 assertBlock( 1301 writeMessageCount == 4, 1302 [NSString stringWithFormat:@"writeMessageCount %@ not equal to 4", @(writeMessageCount)]); 1303 }); 1304} 1305 1306- (void)testEmptyStreamRPC { 1307 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1308 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1309 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"]; 1310 __weak RMTTestService *weakService = service; 1311 [service 1312 fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter] 1313 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response, 1314 NSError *error) { 1315 if (weakService == nil) { 1316 return; 1317 } 1318 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 1319 XCTAssert(done, @"Unexpected response: %@", response); 1320 [expectation fulfill]; 1321 }]; 1322 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1323 }); 1324} 1325 1326- (void)testCancelAfterBeginRPC { 1327 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1328 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1329 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"]; 1330 1331 // A buffered pipe to which we never write any value acts as a writer that just hangs. 1332 GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; 1333 1334 __weak RMTTestService *weakService = service; 1335 GRPCProtoCall *call = [service 1336 RPCToStreamingInputCallWithRequestsWriter:requestsBuffer 1337 handler:^(RMTStreamingInputCallResponse *response, 1338 NSError *error) { 1339 if (weakService == nil) { 1340 return; 1341 } 1342 XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); 1343 [expectation fulfill]; 1344 }]; 1345 XCTAssertEqual(call.state, GRXWriterStateNotStarted); 1346 1347 [call start]; 1348 XCTAssertEqual(call.state, GRXWriterStateStarted); 1349 1350 [call cancel]; 1351 XCTAssertEqual(call.state, GRXWriterStateFinished); 1352 1353 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1354 }); 1355} 1356 1357- (void)testCancelAfterBeginRPCWithV2API { 1358 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1359 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1360 __weak XCTestExpectation *expectation = 1361 [self expectationWithDescription:@"CancelAfterBeginWithV2API"]; 1362 1363 // A buffered pipe to which we never write any value acts as a writer that just hangs. 1364 __weak RMTTestService *weakService = service; 1365 GRPCStreamingProtoCall *call = [service 1366 streamingInputCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] 1367 initWithInitialMetadataCallback:nil 1368 messageCallback:^(id message) { 1369 if (weakService == nil) { 1370 return; 1371 } 1372 XCTFail(@"Not expected to receive message"); 1373 } 1374 closeCallback:^(NSDictionary *trailingMetadata, 1375 NSError *error) { 1376 if (weakService == nil) { 1377 return; 1378 } 1379 XCTAssertEqual(error.code, 1380 GRPC_STATUS_CANCELLED); 1381 [expectation fulfill]; 1382 }] 1383 callOptions:nil]; 1384 [call start]; 1385 [call cancel]; 1386 1387 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1388 }); 1389} 1390 1391- (void)testCancelAfterFirstResponseRPC { 1392 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1393 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1394 __weak XCTestExpectation *expectation = 1395 [self expectationWithDescription:@"CancelAfterFirstResponse"]; 1396 1397 // A buffered pipe to which we write a single value but never close 1398 GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; 1399 1400 __block BOOL receivedResponse = NO; 1401 1402 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782 1403 requestedResponseSize:@31415]; 1404 1405 [requestsBuffer writeValue:request]; 1406 1407 __weak RMTTestService *weakService = service; 1408 __block GRPCProtoCall *call = [service 1409 RPCToFullDuplexCallWithRequestsWriter:requestsBuffer 1410 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response, 1411 NSError *error) { 1412 if (weakService == nil) { 1413 return; 1414 } 1415 if (receivedResponse) { 1416 XCTAssert(done, @"Unexpected extra response %@", response); 1417 XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); 1418 [expectation fulfill]; 1419 } else { 1420 XCTAssertNil(error, @"Finished with unexpected error: %@", 1421 error); 1422 XCTAssertFalse(done, @"Finished without response"); 1423 XCTAssertNotNil(response); 1424 receivedResponse = YES; 1425 [call cancel]; 1426 } 1427 }]; 1428 [call start]; 1429 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1430 }); 1431} 1432 1433- (void)testCancelAfterFirstResponseRPCWithV2API { 1434 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1435 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1436 __weak XCTestExpectation *completionExpectation = 1437 [self expectationWithDescription:@"Call completed."]; 1438 __weak XCTestExpectation *responseExpectation = 1439 [self expectationWithDescription:@"Received response."]; 1440 1441 __block BOOL receivedResponse = NO; 1442 1443 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1444 // For backwards compatibility 1445 options.transportType = self.class.transportType; 1446 options.transport = [[self class] transport]; 1447 options.PEMRootCertificates = self.class.PEMRootCertificates; 1448 options.hostNameOverride = [[self class] hostNameOverride]; 1449 1450 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782 1451 requestedResponseSize:@31415]; 1452 1453 __weak __block GRPCStreamingProtoCall *weakCall; 1454 GRPCStreamingProtoCall *call = [service 1455 fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] 1456 initWithInitialMetadataCallback:nil 1457 messageCallback:^(id message) { 1458 GRPCStreamingProtoCall *localCall = weakCall; 1459 if (localCall == nil) { 1460 return; 1461 } 1462 XCTAssertFalse(receivedResponse); 1463 receivedResponse = YES; 1464 [localCall cancel]; 1465 [responseExpectation fulfill]; 1466 } 1467 closeCallback:^(NSDictionary *trailingMetadata, 1468 NSError *error) { 1469 if (weakCall == nil) { 1470 return; 1471 } 1472 XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); 1473 [completionExpectation fulfill]; 1474 }] 1475 callOptions:options]; 1476 weakCall = call; 1477 [call start]; 1478 [call writeMessage:request]; 1479 waiterBlock(@[ completionExpectation, responseExpectation ], GRPCInteropTestTimeoutDefault); 1480 }); 1481} 1482 1483- (void)testCancelAfterFirstRequestWithV2API { 1484 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1485 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1486 __weak XCTestExpectation *completionExpectation = 1487 [self expectationWithDescription:@"Call completed."]; 1488 1489 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1490 // For backwards compatibility 1491 options.transportType = self.class.transportType; 1492 options.transport = [[self class] transport]; 1493 options.PEMRootCertificates = self.class.PEMRootCertificates; 1494 options.hostNameOverride = [[self class] hostNameOverride]; 1495 1496 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782 1497 requestedResponseSize:@31415]; 1498 1499 __weak RMTTestService *weakService = service; 1500 GRPCStreamingProtoCall *call = [service 1501 fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] 1502 initWithInitialMetadataCallback:nil 1503 messageCallback:^(id message) { 1504 if (weakService == nil) { 1505 return; 1506 } 1507 XCTFail(@"Received unexpected response."); 1508 } 1509 closeCallback:^(NSDictionary *trailingMetadata, 1510 NSError *error) { 1511 if (weakService == nil) { 1512 return; 1513 } 1514 XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); 1515 [completionExpectation fulfill]; 1516 }] 1517 callOptions:options]; 1518 [call start]; 1519 [call writeMessage:request]; 1520 [call cancel]; 1521 waiterBlock(@[ completionExpectation ], GRPCInteropTestTimeoutDefault); 1522 }); 1523} 1524 1525- (void)testRPCAfterClosingOpenConnections { 1526 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1527 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1528 __weak XCTestExpectation *expectation = 1529 [self expectationWithDescription:@"RPC after closing connection"]; 1530 1531 GPBEmpty *request = [GPBEmpty message]; 1532 1533 __weak RMTTestService *weakService = service; 1534 [service 1535 emptyCallWithRequest:request 1536 handler:^(GPBEmpty *response, NSError *error) { 1537 if (weakService == nil) { 1538 return; 1539 } 1540 XCTAssertNil(error, @"First RPC finished with unexpected error: %@", error); 1541 1542#pragma clang diagnostic push 1543#pragma clang diagnostic ignored "-Wdeprecated-declarations" 1544 [GRPCCall closeOpenConnections]; 1545#pragma clang diagnostic pop 1546 1547 [weakService 1548 emptyCallWithRequest:request 1549 handler:^(GPBEmpty *response, NSError *error) { 1550 XCTAssertNil( 1551 error, 1552 @"Second RPC finished with unexpected error: %@", 1553 error); 1554 [expectation fulfill]; 1555 }]; 1556 }]; 1557 1558 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1559 }); 1560} 1561 1562- (void)testCompressedUnaryRPC { 1563 // This test needs to be disabled for remote test because interop server grpc-test 1564 // does not support compression. 1565 if (isRemoteInteropTest([[self class] host])) { 1566 return; 1567 } 1568 1569 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1570 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1571 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"]; 1572 1573 RMTSimpleRequest *request = [RMTSimpleRequest message]; 1574 request.responseType = RMTPayloadType_Compressable; 1575 request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE; 1576 request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE]; 1577 request.expectCompressed.value = YES; 1578 [GRPCCall setDefaultCompressMethod:GRPCCompressGzip forhost:[[self class] host]]; 1579 1580 __weak RMTTestService *weakService = service; 1581 [service unaryCallWithRequest:request 1582 handler:^(RMTSimpleResponse *response, NSError *error) { 1583 if (weakService == nil) { 1584 return; 1585 } 1586 1587 XCTAssertNil(error, @"Finished with unexpected error: %@", error); 1588 1589 RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; 1590 expectedResponse.payload.type = RMTPayloadType_Compressable; 1591 expectedResponse.payload.body = 1592 [NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE]; 1593 XCTAssertEqualObjects(response, expectedResponse); 1594 1595 [expectation fulfill]; 1596 }]; 1597 1598 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1599 }); 1600} 1601 1602// TODO(b/268379869): This test has a race and is flaky in any configurations. One possible way to 1603// deflake this test is to find a way to disable ping ack on the interop server for this test case. 1604- (void)testKeepaliveWithV2API { 1605 return; 1606 1607 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1608 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1609 if ([[self class] transport] == gGRPCCoreCronetID) { 1610 // Cronet does not support keepalive 1611 return; 1612 } 1613 __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Keepalive"]; 1614 1615 const NSTimeInterval kTestTimeout = 5; 1616 NSNumber *kRequestSize = @27182; 1617 NSNumber *kResponseSize = @31415; 1618 1619 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:kRequestSize 1620 requestedResponseSize:kResponseSize]; 1621 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1622 options.transportType = [[self class] transportType]; 1623 options.transport = [[self class] transport]; 1624 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1625 options.hostNameOverride = [[self class] hostNameOverride]; 1626 options.keepaliveInterval = 1.5; 1627 options.keepaliveTimeout = 0; 1628 1629 __weak RMTTestService *weakService = service; 1630 GRPCStreamingProtoCall *call = [service 1631 fullDuplexCallWithResponseHandler: 1632 [[InteropTestsBlockCallbacks alloc] 1633 initWithInitialMetadataCallback:nil 1634 messageCallback:nil 1635 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1636 if (weakService == nil) { 1637 return; 1638 } 1639 XCTAssertNotNil(error); 1640 XCTAssertEqual( 1641 error.code, GRPC_STATUS_UNAVAILABLE, 1642 @"Received status %@ instead of UNAVAILABLE (14).", 1643 @(error.code)); 1644 [expectation fulfill]; 1645 }] 1646 callOptions:options]; 1647 [call writeMessage:request]; 1648 [call start]; 1649 1650 waiterBlock(@[ expectation ], kTestTimeout); 1651 [call finish]; 1652 }); 1653} 1654 1655- (void)testDefaultInterceptor { 1656 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1657 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1658 __weak XCTestExpectation *expectation = 1659 [self expectationWithDescription:@"testDefaultInterceptor"]; 1660 1661 NSArray *requests = @[ @27182, @8, @1828, @45904 ]; 1662 NSArray *responses = @[ @31415, @9, @2653, @58979 ]; 1663 1664 __block int index = 0; 1665 1666 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1667 requestedResponseSize:responses[index]]; 1668 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1669 // For backwards compatibility 1670 options.transportType = [[self class] transportType]; 1671 options.transport = [[self class] transport]; 1672 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1673 options.hostNameOverride = [[self class] hostNameOverride]; 1674 options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init] ]; 1675 1676 __weak __block GRPCStreamingProtoCall *weakCall; 1677 GRPCStreamingProtoCall *call = [service 1678 fullDuplexCallWithResponseHandler: 1679 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 1680 messageCallback:^(id message) { 1681 GRPCStreamingProtoCall *localCall = weakCall; 1682 if (localCall == nil) { 1683 return; 1684 } 1685 assertBlock(index < 4, @"More than 4 responses received."); 1686 1687 id expected = 1688 [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]]; 1689 assertBlock([message isEqual:expected], 1690 [NSString stringWithFormat:@"message %@ not equal to expected %@", 1691 message, expected]); 1692 1693 index += 1; 1694 if (index < 4) { 1695 id request = 1696 [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1697 requestedResponseSize:responses[index]]; 1698 [localCall writeMessage:request]; 1699 } else { 1700 [localCall finish]; 1701 } 1702 } 1703 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1704 if (weakCall == nil) { 1705 return; 1706 } 1707 1708 assertBlock( 1709 index == 4, 1710 [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]); 1711 1712 assertBlock( 1713 error == nil, 1714 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 1715 1716 [expectation fulfill]; 1717 }] 1718 callOptions:options]; 1719 weakCall = call; 1720 [call start]; 1721 [call writeMessage:request]; 1722 1723 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1724 }); 1725} 1726 1727- (void)testLoggingInterceptor { 1728 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1729 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1730 __weak XCTestExpectation *expectation = 1731 [self expectationWithDescription:@"testLoggingInterceptor"]; 1732 1733 __block NSUInteger startCount = 0; 1734 __block NSUInteger writeDataCount = 0; 1735 __block NSUInteger finishCount = 0; 1736 __block NSUInteger receiveNextMessageCount = 0; 1737 __block NSUInteger responseHeaderCount = 0; 1738 __block NSUInteger responseDataCount = 0; 1739 __block NSUInteger responseCloseCount = 0; 1740 __block NSUInteger didWriteDataCount = 0; 1741 id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc] 1742 initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) 1743 startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 1744 GRPCInterceptorManager *manager) { 1745 startCount++; 1746 XCTAssertEqualObjects(requestOptions.host, [[self class] host]); 1747 XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); 1748 XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); 1749 [manager startNextInterceptorWithRequest:[requestOptions copy] 1750 callOptions:[callOptions copy]]; 1751 } 1752 writeDataHook:^(id data, GRPCInterceptorManager *manager) { 1753 writeDataCount++; 1754 [manager writeNextInterceptorWithData:data]; 1755 } 1756 finishHook:^(GRPCInterceptorManager *manager) { 1757 finishCount++; 1758 [manager finishNextInterceptor]; 1759 } 1760 receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { 1761 receiveNextMessageCount++; 1762 [manager receiveNextInterceptorMessages:numberOfMessages]; 1763 } 1764 responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { 1765 responseHeaderCount++; 1766 [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 1767 } 1768 responseDataHook:^(id data, GRPCInterceptorManager *manager) { 1769 responseDataCount++; 1770 [manager forwardPreviousInterceptorWithData:data]; 1771 } 1772 responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, 1773 GRPCInterceptorManager *manager) { 1774 responseCloseCount++; 1775 [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 1776 error:error]; 1777 } 1778 didWriteDataHook:^(GRPCInterceptorManager *manager) { 1779 didWriteDataCount++; 1780 [manager forwardPreviousInterceptorDidWriteData]; 1781 }]; 1782 1783 NSArray *requests = @[ @1, @2, @3, @4 ]; 1784 NSArray *responses = @[ @1, @2, @3, @4 ]; 1785 1786 __block int messageIndex = 0; 1787 1788 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[messageIndex] 1789 requestedResponseSize:responses[messageIndex]]; 1790 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1791 // For backwards compatibility 1792 options.transportType = [[self class] transportType]; 1793 options.transport = [[self class] transport]; 1794 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1795 options.hostNameOverride = [[self class] hostNameOverride]; 1796 options.flowControlEnabled = YES; 1797 options.interceptorFactories = @[ factory ]; 1798 1799 __block int writeMessageCount = 0; 1800 __block __weak GRPCStreamingProtoCall *weakCall; 1801 GRPCStreamingProtoCall *call = [service 1802 fullDuplexCallWithResponseHandler: 1803 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 1804 messageCallback:^(id message) { 1805 GRPCStreamingProtoCall *localCall = weakCall; 1806 if (localCall == nil) { 1807 return; 1808 } 1809 assertBlock((messageIndex < 4), @"More than 4 responses received."); 1810 1811 id expected = [RMTStreamingOutputCallResponse 1812 messageWithPayloadSize:responses[messageIndex]]; 1813 assertBlock([message isEqual:expected], 1814 [NSString stringWithFormat:@"message %@ not equal to expected %@", 1815 message, expected]); 1816 messageIndex += 1; 1817 if (messageIndex < 4) { 1818 id request = [RMTStreamingOutputCallRequest 1819 messageWithPayloadSize:requests[messageIndex] 1820 requestedResponseSize:responses[messageIndex]]; 1821 [localCall writeMessage:request]; 1822 [localCall receiveNextMessage]; 1823 } else { 1824 [localCall finish]; 1825 } 1826 } 1827 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1828 if (weakCall == nil) { 1829 return; 1830 } 1831 assertBlock( 1832 error == nil, 1833 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 1834 assertBlock(messageIndex == 4, 1835 [NSString stringWithFormat:@"Received %@ responses instead of 4.", 1836 @(messageIndex)]); 1837 [expectation fulfill]; 1838 } 1839 writeMessageCallback:^{ 1840 writeMessageCount++; 1841 }] 1842 callOptions:options]; 1843 1844 weakCall = call; 1845 [call start]; 1846 [call receiveNextMessage]; 1847 [call writeMessage:request]; 1848 1849 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 1850 1851 assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]); 1852 assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]); 1853 assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]); 1854 assertBlock(receiveNextMessageCount == 4, 1855 [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]); 1856 assertBlock(responseHeaderCount == 1, 1857 [NSString stringWithFormat:@"%@", @(responseHeaderCount)]); 1858 assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]); 1859 assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]); 1860 assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]); 1861 assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]); 1862 }); 1863} 1864 1865// Chain a default interceptor and a hook interceptor which, after one write, cancels the call 1866// under the hood but forward further data to the user. 1867- (void)testHijackingInterceptor { 1868 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 1869 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 1870 NSUInteger kCancelAfterWrites = 1; 1871 __weak XCTestExpectation *expectUserCallComplete = 1872 [self expectationWithDescription:@"User call completed."]; 1873 __weak XCTestExpectation *expectResponseCallbackComplete = 1874 [self expectationWithDescription:@"Hook interceptor response callback completed"]; 1875 1876 NSArray *responses = @[ @1, @2, @3, @4 ]; 1877 __block int index = 0; 1878 1879 __block NSUInteger startCount = 0; 1880 __block NSUInteger writeDataCount = 0; 1881 __block NSUInteger finishCount = 0; 1882 __block NSUInteger responseHeaderCount = 0; 1883 __block NSUInteger responseDataCount = 0; 1884 __block NSUInteger responseCloseCount = 0; 1885 id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc] 1886 initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) 1887 startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 1888 GRPCInterceptorManager *manager) { 1889 startCount++; 1890 [manager startNextInterceptorWithRequest:[requestOptions copy] 1891 callOptions:[callOptions copy]]; 1892 } 1893 writeDataHook:^(id data, GRPCInterceptorManager *manager) { 1894 writeDataCount++; 1895 if (index < kCancelAfterWrites) { 1896 [manager writeNextInterceptorWithData:data]; 1897 } else if (index == kCancelAfterWrites) { 1898 [manager cancelNextInterceptor]; 1899 [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse 1900 messageWithPayloadSize:responses[index]] 1901 data]]; 1902 } else { // (index > kCancelAfterWrites) 1903 [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse 1904 messageWithPayloadSize:responses[index]] 1905 data]]; 1906 } 1907 } 1908 finishHook:^(GRPCInterceptorManager *manager) { 1909 finishCount++; 1910 // finish must happen after the hijacking, so directly reply with a close 1911 [manager forwardPreviousInterceptorCloseWithTrailingMetadata:@{@"grpc-status" : @"0"} 1912 error:nil]; 1913 [manager shutDown]; 1914 } 1915 receiveNextMessagesHook:nil 1916 responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { 1917 responseHeaderCount++; 1918 [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 1919 } 1920 responseDataHook:^(id data, GRPCInterceptorManager *manager) { 1921 responseDataCount++; 1922 [manager forwardPreviousInterceptorWithData:data]; 1923 } 1924 responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, 1925 GRPCInterceptorManager *manager) { 1926 responseCloseCount++; 1927 // since we canceled the call, it should return cancel error 1928 XCTAssertNil(trailingMetadata); 1929 XCTAssertNotNil(error); 1930 XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); 1931 [expectResponseCallbackComplete fulfill]; 1932 } 1933 didWriteDataHook:nil]; 1934 1935 NSArray *requests = @[ @1, @2, @3, @4 ]; 1936 1937 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1938 requestedResponseSize:responses[index]]; 1939 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 1940 // For backwards compatibility 1941 options.transportType = [[self class] transportType]; 1942 options.transport = [[self class] transport]; 1943 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 1944 options.hostNameOverride = [[self class] hostNameOverride]; 1945 options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init], factory ]; 1946 1947 __weak __block GRPCStreamingProtoCall *weakCall; 1948 GRPCStreamingProtoCall *call = [service 1949 fullDuplexCallWithResponseHandler: 1950 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 1951 messageCallback:^(id message) { 1952 GRPCStreamingProtoCall *localCall = weakCall; 1953 if (localCall == nil) { 1954 return; 1955 } 1956 1957 assertBlock(index < 4, @"More than 4 responses received."); 1958 1959 id expected = 1960 [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]]; 1961 assertBlock([message isEqual:expected], 1962 [NSString stringWithFormat:@"message %@ not equal to expected %@", 1963 message, expected]); 1964 index += 1; 1965 if (index < 4) { 1966 id request = 1967 [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 1968 requestedResponseSize:responses[index]]; 1969 [localCall writeMessage:request]; 1970 [localCall receiveNextMessage]; 1971 } else { 1972 [self waitForExpectations:@[ expectResponseCallbackComplete ] 1973 timeout:GRPCInteropTestTimeoutDefault]; 1974 [localCall finish]; 1975 } 1976 } 1977 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 1978 if (weakCall == nil) { 1979 return; 1980 } 1981 assertBlock( 1982 error == nil, 1983 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 1984 assertBlock( 1985 index == 4, 1986 [NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]); 1987 1988 [expectUserCallComplete fulfill]; 1989 }] 1990 callOptions:options]; 1991 weakCall = call; 1992 [call start]; 1993 [call receiveNextMessage]; 1994 [call writeMessage:request]; 1995 1996 waiterBlock(@[ expectUserCallComplete ], GRPCInteropTestTimeoutDefault); 1997 assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]); 1998 assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]); 1999 assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]); 2000 assertBlock(responseHeaderCount == 1, 2001 [NSString stringWithFormat:@"%@", @(responseHeaderCount)]); 2002 assertBlock(responseDataCount == 1, [NSString stringWithFormat:@"%@", @(responseDataCount)]); 2003 assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]); 2004 }); 2005} 2006 2007- (void)testGlobalInterceptor { 2008 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 2009 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 2010 __weak XCTestExpectation *expectation = 2011 [self expectationWithDescription:@"testGlobalInterceptor"]; 2012 2013 __block NSUInteger startCount = 0; 2014 __block NSUInteger writeDataCount = 0; 2015 __block NSUInteger finishCount = 0; 2016 __block NSUInteger receiveNextMessageCount = 0; 2017 __block NSUInteger responseHeaderCount = 0; 2018 __block NSUInteger responseDataCount = 0; 2019 __block NSUInteger responseCloseCount = 0; 2020 __block NSUInteger didWriteDataCount = 0; 2021 [globalInterceptorFactory 2022 setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 2023 GRPCInterceptorManager *manager) { 2024 startCount++; 2025 XCTAssertEqualObjects(requestOptions.host, [[self class] host]); 2026 XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); 2027 XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); 2028 [manager startNextInterceptorWithRequest:[requestOptions copy] 2029 callOptions:[callOptions copy]]; 2030 } 2031 writeDataHook:^(id data, GRPCInterceptorManager *manager) { 2032 writeDataCount++; 2033 [manager writeNextInterceptorWithData:data]; 2034 } 2035 finishHook:^(GRPCInterceptorManager *manager) { 2036 finishCount++; 2037 [manager finishNextInterceptor]; 2038 } 2039 receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { 2040 receiveNextMessageCount++; 2041 [manager receiveNextInterceptorMessages:numberOfMessages]; 2042 } 2043 responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { 2044 responseHeaderCount++; 2045 [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 2046 } 2047 responseDataHook:^(id data, GRPCInterceptorManager *manager) { 2048 responseDataCount++; 2049 [manager forwardPreviousInterceptorWithData:data]; 2050 } 2051 responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, 2052 GRPCInterceptorManager *manager) { 2053 responseCloseCount++; 2054 [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 2055 error:error]; 2056 } 2057 didWriteDataHook:^(GRPCInterceptorManager *manager) { 2058 didWriteDataCount++; 2059 [manager forwardPreviousInterceptorDidWriteData]; 2060 }]; 2061 2062 NSArray *requests = @[ @1, @2, @3, @4 ]; 2063 NSArray *responses = @[ @1, @2, @3, @4 ]; 2064 2065 __block int index = 0; 2066 2067 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 2068 requestedResponseSize:responses[index]]; 2069 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 2070 // For backwards compatibility 2071 options.transportType = [[self class] transportType]; 2072 options.transport = [[self class] transport]; 2073 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 2074 options.hostNameOverride = [[self class] hostNameOverride]; 2075 options.flowControlEnabled = YES; 2076 globalInterceptorFactory.enabled = YES; 2077 2078 __block int writeMessageCount = 0; 2079 __weak __block GRPCStreamingProtoCall *weakCall; 2080 __block GRPCStreamingProtoCall *call = [service 2081 fullDuplexCallWithResponseHandler: 2082 [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil 2083 messageCallback:^(id message) { 2084 GRPCStreamingProtoCall *localCall = weakCall; 2085 if (localCall == nil) { 2086 return; 2087 } 2088 assertBlock(index < 4, @"More than 4 responses received."); 2089 2090 index += 1; 2091 if (index < 4) { 2092 id request = 2093 [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 2094 requestedResponseSize:responses[index]]; 2095 [localCall writeMessage:request]; 2096 [localCall receiveNextMessage]; 2097 } else { 2098 [localCall finish]; 2099 } 2100 } 2101 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { 2102 if (weakCall == nil) { 2103 return; 2104 } 2105 assertBlock( 2106 error == nil, 2107 [NSString stringWithFormat:@"Finished with unexpected error: %@", error]); 2108 [expectation fulfill]; 2109 } 2110 writeMessageCallback:^{ 2111 writeMessageCount++; 2112 }] 2113 callOptions:options]; 2114 weakCall = call; 2115 [call start]; 2116 [call receiveNextMessage]; 2117 [call writeMessage:request]; 2118 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 2119 2120 assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]); 2121 assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]); 2122 assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]); 2123 assertBlock(receiveNextMessageCount == 4, 2124 [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]); 2125 assertBlock(responseHeaderCount == 1, 2126 [NSString stringWithFormat:@"%@", @(responseHeaderCount)]); 2127 assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]); 2128 assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]); 2129 assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]); 2130 assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]); 2131 globalInterceptorFactory.enabled = NO; 2132 }); 2133} 2134 2135- (void)testConflictingGlobalInterceptors { 2136 id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc] 2137 initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) 2138 startHook:nil 2139 writeDataHook:nil 2140 finishHook:nil 2141 receiveNextMessagesHook:nil 2142 responseHeaderHook:nil 2143 responseDataHook:nil 2144 responseCloseHook:nil 2145 didWriteDataHook:nil]; 2146 @try { 2147 [GRPCCall2 registerGlobalInterceptor:factory]; 2148 XCTFail(@"Did not receive an exception when registering global interceptor the second time"); 2149 } @catch (NSException *exception) { 2150 // Do nothing; test passes 2151 } 2152} 2153 2154- (void)testInterceptorAndGlobalInterceptor { 2155 GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) { 2156 RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]]; 2157 __weak XCTestExpectation *expectation = 2158 [self expectationWithDescription:@"testInterceptorAndGlobalInterceptor"]; 2159 2160 __block NSUInteger startCount = 0; 2161 __block NSUInteger writeDataCount = 0; 2162 __block NSUInteger finishCount = 0; 2163 __block NSUInteger receiveNextMessageCount = 0; 2164 __block NSUInteger responseHeaderCount = 0; 2165 __block NSUInteger responseDataCount = 0; 2166 __block NSUInteger responseCloseCount = 0; 2167 __block NSUInteger didWriteDataCount = 0; 2168 2169 id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc] 2170 initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) 2171 startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 2172 GRPCInterceptorManager *manager) { 2173 startCount++; 2174 XCTAssertEqualObjects(requestOptions.host, [[self class] host]); 2175 XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); 2176 XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); 2177 [manager startNextInterceptorWithRequest:[requestOptions copy] 2178 callOptions:[callOptions copy]]; 2179 } 2180 writeDataHook:^(id data, GRPCInterceptorManager *manager) { 2181 writeDataCount++; 2182 [manager writeNextInterceptorWithData:data]; 2183 } 2184 finishHook:^(GRPCInterceptorManager *manager) { 2185 finishCount++; 2186 [manager finishNextInterceptor]; 2187 } 2188 receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { 2189 receiveNextMessageCount++; 2190 [manager receiveNextInterceptorMessages:numberOfMessages]; 2191 } 2192 responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { 2193 responseHeaderCount++; 2194 [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 2195 } 2196 responseDataHook:^(id data, GRPCInterceptorManager *manager) { 2197 responseDataCount++; 2198 [manager forwardPreviousInterceptorWithData:data]; 2199 } 2200 responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, 2201 GRPCInterceptorManager *manager) { 2202 responseCloseCount++; 2203 [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 2204 error:error]; 2205 } 2206 didWriteDataHook:^(GRPCInterceptorManager *manager) { 2207 didWriteDataCount++; 2208 [manager forwardPreviousInterceptorDidWriteData]; 2209 }]; 2210 2211 __block NSUInteger globalStartCount = 0; 2212 __block NSUInteger globalWriteDataCount = 0; 2213 __block NSUInteger globalFinishCount = 0; 2214 __block NSUInteger globalReceiveNextMessageCount = 0; 2215 __block NSUInteger globalResponseHeaderCount = 0; 2216 __block NSUInteger globalResponseDataCount = 0; 2217 __block NSUInteger globalResponseCloseCount = 0; 2218 __block NSUInteger globalDidWriteDataCount = 0; 2219 2220 [globalInterceptorFactory 2221 setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, 2222 GRPCInterceptorManager *manager) { 2223 globalStartCount++; 2224 XCTAssertEqualObjects(requestOptions.host, [[self class] host]); 2225 XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); 2226 XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); 2227 [manager startNextInterceptorWithRequest:[requestOptions copy] 2228 callOptions:[callOptions copy]]; 2229 } 2230 writeDataHook:^(id data, GRPCInterceptorManager *manager) { 2231 globalWriteDataCount++; 2232 [manager writeNextInterceptorWithData:data]; 2233 } 2234 finishHook:^(GRPCInterceptorManager *manager) { 2235 globalFinishCount++; 2236 [manager finishNextInterceptor]; 2237 } 2238 receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { 2239 globalReceiveNextMessageCount++; 2240 [manager receiveNextInterceptorMessages:numberOfMessages]; 2241 } 2242 responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { 2243 globalResponseHeaderCount++; 2244 [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 2245 } 2246 responseDataHook:^(id data, GRPCInterceptorManager *manager) { 2247 globalResponseDataCount++; 2248 [manager forwardPreviousInterceptorWithData:data]; 2249 } 2250 responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, 2251 GRPCInterceptorManager *manager) { 2252 globalResponseCloseCount++; 2253 [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 2254 error:error]; 2255 } 2256 didWriteDataHook:^(GRPCInterceptorManager *manager) { 2257 globalDidWriteDataCount++; 2258 [manager forwardPreviousInterceptorDidWriteData]; 2259 }]; 2260 2261 NSArray *requests = @[ @1, @2, @3, @4 ]; 2262 NSArray *responses = @[ @1, @2, @3, @4 ]; 2263 2264 __block int index = 0; 2265 2266 id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] 2267 requestedResponseSize:responses[index]]; 2268 GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; 2269 // For backwards compatibility 2270 options.transportType = [[self class] transportType]; 2271 options.transport = [[self class] transport]; 2272 options.PEMRootCertificates = [[self class] PEMRootCertificates]; 2273 options.hostNameOverride = [[self class] hostNameOverride]; 2274 options.flowControlEnabled = YES; 2275 options.interceptorFactories = @[ factory ]; 2276 globalInterceptorFactory.enabled = YES; 2277 2278 __block int writeMessageCount = 0; 2279 __weak __block GRPCStreamingProtoCall *weakCall; 2280 GRPCStreamingProtoCall *call = [service 2281 fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] 2282 initWithInitialMetadataCallback:nil 2283 messageCallback:^(id message) { 2284 GRPCStreamingProtoCall *localCall = weakCall; 2285 if (localCall == nil) { 2286 return; 2287 } 2288 index += 1; 2289 if (index < 4) { 2290 id request = [RMTStreamingOutputCallRequest 2291 messageWithPayloadSize:requests[index] 2292 requestedResponseSize:responses[index]]; 2293 [localCall writeMessage:request]; 2294 [localCall receiveNextMessage]; 2295 } else { 2296 [localCall finish]; 2297 } 2298 } 2299 closeCallback:^(NSDictionary *trailingMetadata, 2300 NSError *error) { 2301 if (weakCall == nil) { 2302 return; 2303 } 2304 [expectation fulfill]; 2305 } 2306 writeMessageCallback:^{ 2307 writeMessageCount++; 2308 }] 2309 callOptions:options]; 2310 weakCall = call; 2311 [call start]; 2312 [call receiveNextMessage]; 2313 [call writeMessage:request]; 2314 2315 waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault); 2316 assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]); 2317 assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]); 2318 assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]); 2319 assertBlock(receiveNextMessageCount == 4, 2320 [NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]); 2321 assertBlock(responseHeaderCount == 1, 2322 [NSString stringWithFormat:@"%@", @(responseHeaderCount)]); 2323 assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]); 2324 assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]); 2325 assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]); 2326 assertBlock(globalStartCount == 1, [NSString stringWithFormat:@"%@", @(globalStartCount)]); 2327 assertBlock(globalWriteDataCount == 4, 2328 [NSString stringWithFormat:@"%@", @(globalWriteDataCount)]); 2329 assertBlock(globalFinishCount == 1, [NSString stringWithFormat:@"%@", @(globalFinishCount)]); 2330 assertBlock(globalReceiveNextMessageCount == 4, 2331 [NSString stringWithFormat:@"%@", @(globalReceiveNextMessageCount)]); 2332 assertBlock(globalResponseHeaderCount == 1, 2333 [NSString stringWithFormat:@"%@", @(globalResponseHeaderCount)]); 2334 assertBlock(globalResponseDataCount == 4, 2335 [NSString stringWithFormat:@"%@", @(globalResponseDataCount)]); 2336 assertBlock(globalResponseCloseCount == 1, 2337 [NSString stringWithFormat:@"%@", @(globalResponseCloseCount)]); 2338 assertBlock(globalDidWriteDataCount == 4, 2339 [NSString stringWithFormat:@"%@", @(globalDidWriteDataCount)]); 2340 assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]); 2341 globalInterceptorFactory.enabled = NO; 2342 }); 2343} 2344 2345@end 2346