xref: /aosp_15_r20/external/grpc-grpc/src/objective-c/RxLibrary/GRXBufferedPipe.m (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker/*
2*cc02d7e2SAndroid Build Coastguard Worker *
3*cc02d7e2SAndroid Build Coastguard Worker * Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker *
5*cc02d7e2SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker * You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker *
9*cc02d7e2SAndroid Build Coastguard Worker *     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker *
11*cc02d7e2SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker * limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker *
17*cc02d7e2SAndroid Build Coastguard Worker */
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker#import "GRXBufferedPipe.h"
20*cc02d7e2SAndroid Build Coastguard Worker
21*cc02d7e2SAndroid Build Coastguard Worker@interface GRXBufferedPipe ()
22*cc02d7e2SAndroid Build Coastguard Worker@property(atomic) id<GRXWriteable> writeable;
23*cc02d7e2SAndroid Build Coastguard Worker@end
24*cc02d7e2SAndroid Build Coastguard Worker
25*cc02d7e2SAndroid Build Coastguard Worker@implementation GRXBufferedPipe {
26*cc02d7e2SAndroid Build Coastguard Worker  NSError *_errorOrNil;
27*cc02d7e2SAndroid Build Coastguard Worker  dispatch_queue_t _writeQueue;
28*cc02d7e2SAndroid Build Coastguard Worker}
29*cc02d7e2SAndroid Build Coastguard Worker
30*cc02d7e2SAndroid Build Coastguard Worker@synthesize state = _state;
31*cc02d7e2SAndroid Build Coastguard Worker
32*cc02d7e2SAndroid Build Coastguard Worker+ (instancetype)pipe {
33*cc02d7e2SAndroid Build Coastguard Worker  return [[self alloc] init];
34*cc02d7e2SAndroid Build Coastguard Worker}
35*cc02d7e2SAndroid Build Coastguard Worker
36*cc02d7e2SAndroid Build Coastguard Worker- (instancetype)init {
37*cc02d7e2SAndroid Build Coastguard Worker  if (self = [super init]) {
38*cc02d7e2SAndroid Build Coastguard Worker    _state = GRXWriterStateNotStarted;
39*cc02d7e2SAndroid Build Coastguard Worker    _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
40*cc02d7e2SAndroid Build Coastguard Worker    dispatch_suspend(_writeQueue);
41*cc02d7e2SAndroid Build Coastguard Worker  }
42*cc02d7e2SAndroid Build Coastguard Worker  return self;
43*cc02d7e2SAndroid Build Coastguard Worker}
44*cc02d7e2SAndroid Build Coastguard Worker
45*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriteable implementation
46*cc02d7e2SAndroid Build Coastguard Worker
47*cc02d7e2SAndroid Build Coastguard Worker- (void)writeValue:(id)value {
48*cc02d7e2SAndroid Build Coastguard Worker  if ([value respondsToSelector:@selector(copy)]) {
49*cc02d7e2SAndroid Build Coastguard Worker    // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
50*cc02d7e2SAndroid Build Coastguard Worker    // So just buffer the new value.
51*cc02d7e2SAndroid Build Coastguard Worker    // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
52*cc02d7e2SAndroid Build Coastguard Worker    value = [value copy];
53*cc02d7e2SAndroid Build Coastguard Worker  }
54*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_writeQueue, ^(void) {
55*cc02d7e2SAndroid Build Coastguard Worker    @synchronized(self) {
56*cc02d7e2SAndroid Build Coastguard Worker      if (self->_state == GRXWriterStateFinished) {
57*cc02d7e2SAndroid Build Coastguard Worker        return;
58*cc02d7e2SAndroid Build Coastguard Worker      }
59*cc02d7e2SAndroid Build Coastguard Worker      [self.writeable writeValue:value];
60*cc02d7e2SAndroid Build Coastguard Worker    }
61*cc02d7e2SAndroid Build Coastguard Worker  });
62*cc02d7e2SAndroid Build Coastguard Worker}
63*cc02d7e2SAndroid Build Coastguard Worker
64*cc02d7e2SAndroid Build Coastguard Worker- (void)writesFinishedWithError:(NSError *)errorOrNil {
65*cc02d7e2SAndroid Build Coastguard Worker  dispatch_async(_writeQueue, ^{
66*cc02d7e2SAndroid Build Coastguard Worker    if (self->_state == GRXWriterStateFinished) {
67*cc02d7e2SAndroid Build Coastguard Worker      return;
68*cc02d7e2SAndroid Build Coastguard Worker    }
69*cc02d7e2SAndroid Build Coastguard Worker    [self finishWithError:errorOrNil];
70*cc02d7e2SAndroid Build Coastguard Worker  });
71*cc02d7e2SAndroid Build Coastguard Worker}
72*cc02d7e2SAndroid Build Coastguard Worker
73*cc02d7e2SAndroid Build Coastguard Worker#pragma mark GRXWriter implementation
74*cc02d7e2SAndroid Build Coastguard Worker
75*cc02d7e2SAndroid Build Coastguard Worker- (void)setState:(GRXWriterState)newState {
76*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
77*cc02d7e2SAndroid Build Coastguard Worker    // Manual transitions are only allowed from the started or paused states.
78*cc02d7e2SAndroid Build Coastguard Worker    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
79*cc02d7e2SAndroid Build Coastguard Worker      return;
80*cc02d7e2SAndroid Build Coastguard Worker    }
81*cc02d7e2SAndroid Build Coastguard Worker
82*cc02d7e2SAndroid Build Coastguard Worker    switch (newState) {
83*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateFinished:
84*cc02d7e2SAndroid Build Coastguard Worker        self.writeable = nil;
85*cc02d7e2SAndroid Build Coastguard Worker        if (_state == GRXWriterStatePaused) {
86*cc02d7e2SAndroid Build Coastguard Worker          dispatch_resume(_writeQueue);
87*cc02d7e2SAndroid Build Coastguard Worker        }
88*cc02d7e2SAndroid Build Coastguard Worker        _state = newState;
89*cc02d7e2SAndroid Build Coastguard Worker        return;
90*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStatePaused:
91*cc02d7e2SAndroid Build Coastguard Worker        if (_state == GRXWriterStateStarted) {
92*cc02d7e2SAndroid Build Coastguard Worker          _state = newState;
93*cc02d7e2SAndroid Build Coastguard Worker          dispatch_suspend(_writeQueue);
94*cc02d7e2SAndroid Build Coastguard Worker        }
95*cc02d7e2SAndroid Build Coastguard Worker        return;
96*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateStarted:
97*cc02d7e2SAndroid Build Coastguard Worker        if (_state == GRXWriterStatePaused) {
98*cc02d7e2SAndroid Build Coastguard Worker          _state = newState;
99*cc02d7e2SAndroid Build Coastguard Worker          dispatch_resume(_writeQueue);
100*cc02d7e2SAndroid Build Coastguard Worker        }
101*cc02d7e2SAndroid Build Coastguard Worker        return;
102*cc02d7e2SAndroid Build Coastguard Worker      case GRXWriterStateNotStarted:
103*cc02d7e2SAndroid Build Coastguard Worker        return;
104*cc02d7e2SAndroid Build Coastguard Worker    }
105*cc02d7e2SAndroid Build Coastguard Worker  }
106*cc02d7e2SAndroid Build Coastguard Worker}
107*cc02d7e2SAndroid Build Coastguard Worker
108*cc02d7e2SAndroid Build Coastguard Worker- (void)startWithWriteable:(id<GRXWriteable>)writeable {
109*cc02d7e2SAndroid Build Coastguard Worker  @synchronized(self) {
110*cc02d7e2SAndroid Build Coastguard Worker    self.writeable = writeable;
111*cc02d7e2SAndroid Build Coastguard Worker    _state = GRXWriterStateStarted;
112*cc02d7e2SAndroid Build Coastguard Worker  }
113*cc02d7e2SAndroid Build Coastguard Worker  dispatch_resume(_writeQueue);
114*cc02d7e2SAndroid Build Coastguard Worker}
115*cc02d7e2SAndroid Build Coastguard Worker
116*cc02d7e2SAndroid Build Coastguard Worker- (void)finishWithError:(NSError *)errorOrNil {
117*cc02d7e2SAndroid Build Coastguard Worker  [self.writeable writesFinishedWithError:errorOrNil];
118*cc02d7e2SAndroid Build Coastguard Worker}
119*cc02d7e2SAndroid Build Coastguard Worker
120*cc02d7e2SAndroid Build Coastguard Worker- (void)dealloc {
121*cc02d7e2SAndroid Build Coastguard Worker  GRXWriterState state = self.state;
122*cc02d7e2SAndroid Build Coastguard Worker  if (state == GRXWriterStateNotStarted || state == GRXWriterStatePaused) {
123*cc02d7e2SAndroid Build Coastguard Worker    dispatch_resume(_writeQueue);
124*cc02d7e2SAndroid Build Coastguard Worker  }
125*cc02d7e2SAndroid Build Coastguard Worker}
126*cc02d7e2SAndroid Build Coastguard Worker
127*cc02d7e2SAndroid Build Coastguard Worker@end
128