xref: /aosp_15_r20/external/pigweed/pw_rpc/ts/call.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker// Copyright 2021 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker//
3*61c4878aSAndroid Build Coastguard Worker// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker// use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker// the License at
6*61c4878aSAndroid Build Coastguard Worker//
7*61c4878aSAndroid Build Coastguard Worker//     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker//
9*61c4878aSAndroid Build Coastguard Worker// Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker// License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker// the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Workerimport { Status } from 'pigweedjs/pw_status';
16*61c4878aSAndroid Build Coastguard Workerimport { Message } from 'google-protobuf';
17*61c4878aSAndroid Build Coastguard Worker
18*61c4878aSAndroid Build Coastguard Workerimport WaitQueue from './queue';
19*61c4878aSAndroid Build Coastguard Worker
20*61c4878aSAndroid Build Coastguard Workerimport { PendingCalls, Rpc } from './rpc_classes';
21*61c4878aSAndroid Build Coastguard Worker
22*61c4878aSAndroid Build Coastguard Workerexport type Callback = (a: any) => any;
23*61c4878aSAndroid Build Coastguard Worker
24*61c4878aSAndroid Build Coastguard Workerclass RpcError extends Error {
25*61c4878aSAndroid Build Coastguard Worker  status: Status;
26*61c4878aSAndroid Build Coastguard Worker
27*61c4878aSAndroid Build Coastguard Worker  constructor(rpc: Rpc, status: Status) {
28*61c4878aSAndroid Build Coastguard Worker    let message = '';
29*61c4878aSAndroid Build Coastguard Worker    if (status === Status.NOT_FOUND) {
30*61c4878aSAndroid Build Coastguard Worker      message = ': the RPC server does not support this RPC';
31*61c4878aSAndroid Build Coastguard Worker    } else if (status === Status.DATA_LOSS) {
32*61c4878aSAndroid Build Coastguard Worker      message = ': an error occurred while decoding the RPC payload';
33*61c4878aSAndroid Build Coastguard Worker    }
34*61c4878aSAndroid Build Coastguard Worker
35*61c4878aSAndroid Build Coastguard Worker    super(`${rpc.method.name} failed with error ${Status[status]}${message}`);
36*61c4878aSAndroid Build Coastguard Worker    this.status = status;
37*61c4878aSAndroid Build Coastguard Worker  }
38*61c4878aSAndroid Build Coastguard Worker}
39*61c4878aSAndroid Build Coastguard Worker
40*61c4878aSAndroid Build Coastguard Workerclass RpcTimeout extends Error {
41*61c4878aSAndroid Build Coastguard Worker  readonly rpc: Rpc;
42*61c4878aSAndroid Build Coastguard Worker  readonly timeoutMs: number;
43*61c4878aSAndroid Build Coastguard Worker
44*61c4878aSAndroid Build Coastguard Worker  constructor(rpc: Rpc, timeoutMs: number) {
45*61c4878aSAndroid Build Coastguard Worker    super(`${rpc.method.name} timed out after ${timeoutMs} ms`);
46*61c4878aSAndroid Build Coastguard Worker    this.rpc = rpc;
47*61c4878aSAndroid Build Coastguard Worker    this.timeoutMs = timeoutMs;
48*61c4878aSAndroid Build Coastguard Worker  }
49*61c4878aSAndroid Build Coastguard Worker}
50*61c4878aSAndroid Build Coastguard Worker
51*61c4878aSAndroid Build Coastguard Workerclass Responses {
52*61c4878aSAndroid Build Coastguard Worker  private responses: Message[] = [];
53*61c4878aSAndroid Build Coastguard Worker  private totalResponses = 0;
54*61c4878aSAndroid Build Coastguard Worker  private readonly maxResponses: number;
55*61c4878aSAndroid Build Coastguard Worker
56*61c4878aSAndroid Build Coastguard Worker  constructor(maxResponses: number) {
57*61c4878aSAndroid Build Coastguard Worker    this.maxResponses = maxResponses;
58*61c4878aSAndroid Build Coastguard Worker  }
59*61c4878aSAndroid Build Coastguard Worker
60*61c4878aSAndroid Build Coastguard Worker  get length(): number {
61*61c4878aSAndroid Build Coastguard Worker    return Math.min(this.totalResponses, this.maxResponses);
62*61c4878aSAndroid Build Coastguard Worker  }
63*61c4878aSAndroid Build Coastguard Worker
64*61c4878aSAndroid Build Coastguard Worker  push(response: Message): void {
65*61c4878aSAndroid Build Coastguard Worker    this.responses[this.totalResponses % this.maxResponses] = response;
66*61c4878aSAndroid Build Coastguard Worker    this.totalResponses += 1;
67*61c4878aSAndroid Build Coastguard Worker  }
68*61c4878aSAndroid Build Coastguard Worker
69*61c4878aSAndroid Build Coastguard Worker  last(): Message | undefined {
70*61c4878aSAndroid Build Coastguard Worker    if (this.totalResponses === 0) {
71*61c4878aSAndroid Build Coastguard Worker      return undefined;
72*61c4878aSAndroid Build Coastguard Worker    }
73*61c4878aSAndroid Build Coastguard Worker
74*61c4878aSAndroid Build Coastguard Worker    const lastIndex = (this.totalResponses - 1) % this.maxResponses;
75*61c4878aSAndroid Build Coastguard Worker    return this.responses[lastIndex];
76*61c4878aSAndroid Build Coastguard Worker  }
77*61c4878aSAndroid Build Coastguard Worker
78*61c4878aSAndroid Build Coastguard Worker  getAll(): Message[] {
79*61c4878aSAndroid Build Coastguard Worker    if (this.totalResponses < this.maxResponses) {
80*61c4878aSAndroid Build Coastguard Worker      return this.responses.slice(0, this.totalResponses);
81*61c4878aSAndroid Build Coastguard Worker    }
82*61c4878aSAndroid Build Coastguard Worker
83*61c4878aSAndroid Build Coastguard Worker    const splitIndex = this.totalResponses % this.maxResponses;
84*61c4878aSAndroid Build Coastguard Worker    return this.responses
85*61c4878aSAndroid Build Coastguard Worker      .slice(splitIndex)
86*61c4878aSAndroid Build Coastguard Worker      .concat(this.responses.slice(0, splitIndex));
87*61c4878aSAndroid Build Coastguard Worker  }
88*61c4878aSAndroid Build Coastguard Worker}
89*61c4878aSAndroid Build Coastguard Worker
90*61c4878aSAndroid Build Coastguard Worker/** Represent an in-progress or completed RPC call. */
91*61c4878aSAndroid Build Coastguard Workerexport class Call {
92*61c4878aSAndroid Build Coastguard Worker  // Responses ordered by arrival time. Undefined signifies stream completion.
93*61c4878aSAndroid Build Coastguard Worker  private responseQueue = new WaitQueue<Message | undefined>();
94*61c4878aSAndroid Build Coastguard Worker  protected responses: Responses;
95*61c4878aSAndroid Build Coastguard Worker
96*61c4878aSAndroid Build Coastguard Worker  private rpcs: PendingCalls;
97*61c4878aSAndroid Build Coastguard Worker  rpc: Rpc;
98*61c4878aSAndroid Build Coastguard Worker  readonly callId: number;
99*61c4878aSAndroid Build Coastguard Worker
100*61c4878aSAndroid Build Coastguard Worker  private onNext: Callback;
101*61c4878aSAndroid Build Coastguard Worker  private onCompleted: Callback;
102*61c4878aSAndroid Build Coastguard Worker  private onError: Callback;
103*61c4878aSAndroid Build Coastguard Worker
104*61c4878aSAndroid Build Coastguard Worker  status?: Status;
105*61c4878aSAndroid Build Coastguard Worker  error?: Status;
106*61c4878aSAndroid Build Coastguard Worker  callbackException?: Error;
107*61c4878aSAndroid Build Coastguard Worker
108*61c4878aSAndroid Build Coastguard Worker  constructor(
109*61c4878aSAndroid Build Coastguard Worker    rpcs: PendingCalls,
110*61c4878aSAndroid Build Coastguard Worker    rpc: Rpc,
111*61c4878aSAndroid Build Coastguard Worker    onNext: Callback,
112*61c4878aSAndroid Build Coastguard Worker    onCompleted: Callback,
113*61c4878aSAndroid Build Coastguard Worker    onError: Callback,
114*61c4878aSAndroid Build Coastguard Worker    maxResponses: number,
115*61c4878aSAndroid Build Coastguard Worker  ) {
116*61c4878aSAndroid Build Coastguard Worker    this.rpcs = rpcs;
117*61c4878aSAndroid Build Coastguard Worker    this.rpc = rpc;
118*61c4878aSAndroid Build Coastguard Worker    this.responses = new Responses(maxResponses);
119*61c4878aSAndroid Build Coastguard Worker
120*61c4878aSAndroid Build Coastguard Worker    this.onNext = onNext;
121*61c4878aSAndroid Build Coastguard Worker    this.onCompleted = onCompleted;
122*61c4878aSAndroid Build Coastguard Worker    this.onError = onError;
123*61c4878aSAndroid Build Coastguard Worker    this.callId = rpcs.allocateCallId();
124*61c4878aSAndroid Build Coastguard Worker  }
125*61c4878aSAndroid Build Coastguard Worker
126*61c4878aSAndroid Build Coastguard Worker  /* Calls the RPC. This must be called immediately after construction. */
127*61c4878aSAndroid Build Coastguard Worker  invoke(request?: Message, ignoreErrors = false): void {
128*61c4878aSAndroid Build Coastguard Worker    const previous = this.rpcs.sendRequest(
129*61c4878aSAndroid Build Coastguard Worker      this.rpc,
130*61c4878aSAndroid Build Coastguard Worker      this,
131*61c4878aSAndroid Build Coastguard Worker      ignoreErrors,
132*61c4878aSAndroid Build Coastguard Worker      request,
133*61c4878aSAndroid Build Coastguard Worker    );
134*61c4878aSAndroid Build Coastguard Worker
135*61c4878aSAndroid Build Coastguard Worker    if (previous !== undefined && !previous.completed) {
136*61c4878aSAndroid Build Coastguard Worker      previous.handleError(Status.CANCELLED);
137*61c4878aSAndroid Build Coastguard Worker    }
138*61c4878aSAndroid Build Coastguard Worker  }
139*61c4878aSAndroid Build Coastguard Worker
140*61c4878aSAndroid Build Coastguard Worker  get completed(): boolean {
141*61c4878aSAndroid Build Coastguard Worker    return this.status !== undefined || this.error !== undefined;
142*61c4878aSAndroid Build Coastguard Worker  }
143*61c4878aSAndroid Build Coastguard Worker
144*61c4878aSAndroid Build Coastguard Worker  // eslint-disable-next-line @typescript-eslint/ban-types
145*61c4878aSAndroid Build Coastguard Worker  private invokeCallback(func: () => {}) {
146*61c4878aSAndroid Build Coastguard Worker    try {
147*61c4878aSAndroid Build Coastguard Worker      func();
148*61c4878aSAndroid Build Coastguard Worker    } catch (err: unknown) {
149*61c4878aSAndroid Build Coastguard Worker      if (err instanceof Error) {
150*61c4878aSAndroid Build Coastguard Worker        console.error(
151*61c4878aSAndroid Build Coastguard Worker          `An exception was raised while invoking a callback: ${err}`,
152*61c4878aSAndroid Build Coastguard Worker        );
153*61c4878aSAndroid Build Coastguard Worker        this.callbackException = err;
154*61c4878aSAndroid Build Coastguard Worker      }
155*61c4878aSAndroid Build Coastguard Worker      console.error(`Unexpected item thrown while invoking callback: ${err}`);
156*61c4878aSAndroid Build Coastguard Worker    }
157*61c4878aSAndroid Build Coastguard Worker  }
158*61c4878aSAndroid Build Coastguard Worker
159*61c4878aSAndroid Build Coastguard Worker  handleResponse(response: Message): void {
160*61c4878aSAndroid Build Coastguard Worker    this.responses.push(response);
161*61c4878aSAndroid Build Coastguard Worker    this.responseQueue.push(response);
162*61c4878aSAndroid Build Coastguard Worker    this.invokeCallback(() => this.onNext(response));
163*61c4878aSAndroid Build Coastguard Worker  }
164*61c4878aSAndroid Build Coastguard Worker
165*61c4878aSAndroid Build Coastguard Worker  handleCompletion(status: Status) {
166*61c4878aSAndroid Build Coastguard Worker    this.status = status;
167*61c4878aSAndroid Build Coastguard Worker    this.responseQueue.push(undefined);
168*61c4878aSAndroid Build Coastguard Worker    this.invokeCallback(() => this.onCompleted(status));
169*61c4878aSAndroid Build Coastguard Worker  }
170*61c4878aSAndroid Build Coastguard Worker
171*61c4878aSAndroid Build Coastguard Worker  handleError(error: Status): void {
172*61c4878aSAndroid Build Coastguard Worker    this.error = error;
173*61c4878aSAndroid Build Coastguard Worker    this.responseQueue.push(undefined);
174*61c4878aSAndroid Build Coastguard Worker    this.invokeCallback(() => this.onError(error));
175*61c4878aSAndroid Build Coastguard Worker  }
176*61c4878aSAndroid Build Coastguard Worker
177*61c4878aSAndroid Build Coastguard Worker  private async queuePopWithTimeout(
178*61c4878aSAndroid Build Coastguard Worker    timeoutMs: number,
179*61c4878aSAndroid Build Coastguard Worker  ): Promise<Message | undefined> {
180*61c4878aSAndroid Build Coastguard Worker    // eslint-disable-next-line no-async-promise-executor
181*61c4878aSAndroid Build Coastguard Worker    return new Promise(async (resolve, reject) => {
182*61c4878aSAndroid Build Coastguard Worker      let timeoutExpired = false;
183*61c4878aSAndroid Build Coastguard Worker      const timeoutWatcher = setTimeout(() => {
184*61c4878aSAndroid Build Coastguard Worker        timeoutExpired = true;
185*61c4878aSAndroid Build Coastguard Worker        reject(new RpcTimeout(this.rpc, timeoutMs));
186*61c4878aSAndroid Build Coastguard Worker      }, timeoutMs);
187*61c4878aSAndroid Build Coastguard Worker      const response = await this.responseQueue.shift();
188*61c4878aSAndroid Build Coastguard Worker      if (timeoutExpired) {
189*61c4878aSAndroid Build Coastguard Worker        this.responseQueue.unshift(response);
190*61c4878aSAndroid Build Coastguard Worker        return;
191*61c4878aSAndroid Build Coastguard Worker      }
192*61c4878aSAndroid Build Coastguard Worker      clearTimeout(timeoutWatcher);
193*61c4878aSAndroid Build Coastguard Worker      resolve(response);
194*61c4878aSAndroid Build Coastguard Worker    });
195*61c4878aSAndroid Build Coastguard Worker  }
196*61c4878aSAndroid Build Coastguard Worker
197*61c4878aSAndroid Build Coastguard Worker  /**
198*61c4878aSAndroid Build Coastguard Worker   * Yields responses up the specified count as they are added.
199*61c4878aSAndroid Build Coastguard Worker   *
200*61c4878aSAndroid Build Coastguard Worker   * Throws an error as soon as it is received even if there are still
201*61c4878aSAndroid Build Coastguard Worker   * responses in the queue.
202*61c4878aSAndroid Build Coastguard Worker   *
203*61c4878aSAndroid Build Coastguard Worker   * Usage
204*61c4878aSAndroid Build Coastguard Worker   * ```
205*61c4878aSAndroid Build Coastguard Worker   * for await (const response of call.getResponses(5)) {
206*61c4878aSAndroid Build Coastguard Worker   *  console.log(response);
207*61c4878aSAndroid Build Coastguard Worker   * }
208*61c4878aSAndroid Build Coastguard Worker   * ```
209*61c4878aSAndroid Build Coastguard Worker   *
210*61c4878aSAndroid Build Coastguard Worker   * @param {number} count The number of responses to read before returning.
211*61c4878aSAndroid Build Coastguard Worker   *    If no value is specified, getResponses will block until the stream
212*61c4878aSAndroid Build Coastguard Worker   *    either ends or hits an error.
213*61c4878aSAndroid Build Coastguard Worker   * @param {number} timeout The number of milliseconds to wait for a response
214*61c4878aSAndroid Build Coastguard Worker   *    before throwing an error.
215*61c4878aSAndroid Build Coastguard Worker   */
216*61c4878aSAndroid Build Coastguard Worker  async *getResponses(
217*61c4878aSAndroid Build Coastguard Worker    count?: number,
218*61c4878aSAndroid Build Coastguard Worker    timeoutMs?: number,
219*61c4878aSAndroid Build Coastguard Worker  ): AsyncGenerator<Message> {
220*61c4878aSAndroid Build Coastguard Worker    this.checkErrors();
221*61c4878aSAndroid Build Coastguard Worker
222*61c4878aSAndroid Build Coastguard Worker    if (this.completed && this.responseQueue.length == 0) {
223*61c4878aSAndroid Build Coastguard Worker      return;
224*61c4878aSAndroid Build Coastguard Worker    }
225*61c4878aSAndroid Build Coastguard Worker
226*61c4878aSAndroid Build Coastguard Worker    let remaining = count ?? Number.POSITIVE_INFINITY;
227*61c4878aSAndroid Build Coastguard Worker    while (remaining > 0) {
228*61c4878aSAndroid Build Coastguard Worker      const response =
229*61c4878aSAndroid Build Coastguard Worker        timeoutMs === undefined
230*61c4878aSAndroid Build Coastguard Worker          ? await this.responseQueue.shift()
231*61c4878aSAndroid Build Coastguard Worker          : await this.queuePopWithTimeout(timeoutMs!);
232*61c4878aSAndroid Build Coastguard Worker      this.checkErrors();
233*61c4878aSAndroid Build Coastguard Worker      if (response === undefined) {
234*61c4878aSAndroid Build Coastguard Worker        return;
235*61c4878aSAndroid Build Coastguard Worker      }
236*61c4878aSAndroid Build Coastguard Worker      yield response!;
237*61c4878aSAndroid Build Coastguard Worker      remaining -= 1;
238*61c4878aSAndroid Build Coastguard Worker    }
239*61c4878aSAndroid Build Coastguard Worker  }
240*61c4878aSAndroid Build Coastguard Worker
241*61c4878aSAndroid Build Coastguard Worker  cancel(): boolean {
242*61c4878aSAndroid Build Coastguard Worker    if (this.completed) {
243*61c4878aSAndroid Build Coastguard Worker      return false;
244*61c4878aSAndroid Build Coastguard Worker    }
245*61c4878aSAndroid Build Coastguard Worker
246*61c4878aSAndroid Build Coastguard Worker    this.error = Status.CANCELLED;
247*61c4878aSAndroid Build Coastguard Worker    return this.rpcs.sendCancel(this.rpc, this.callId);
248*61c4878aSAndroid Build Coastguard Worker  }
249*61c4878aSAndroid Build Coastguard Worker
250*61c4878aSAndroid Build Coastguard Worker  private checkErrors(): void {
251*61c4878aSAndroid Build Coastguard Worker    if (this.callbackException !== undefined) {
252*61c4878aSAndroid Build Coastguard Worker      throw this.callbackException;
253*61c4878aSAndroid Build Coastguard Worker    }
254*61c4878aSAndroid Build Coastguard Worker    if (this.error !== undefined) {
255*61c4878aSAndroid Build Coastguard Worker      throw new RpcError(this.rpc, this.error);
256*61c4878aSAndroid Build Coastguard Worker    }
257*61c4878aSAndroid Build Coastguard Worker  }
258*61c4878aSAndroid Build Coastguard Worker
259*61c4878aSAndroid Build Coastguard Worker  protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> {
260*61c4878aSAndroid Build Coastguard Worker    for await (const response of this.getResponses(1, timeoutMs)) {
261*61c4878aSAndroid Build Coastguard Worker      // Do nothing.
262*61c4878aSAndroid Build Coastguard Worker    }
263*61c4878aSAndroid Build Coastguard Worker    if (this.status === undefined) {
264*61c4878aSAndroid Build Coastguard Worker      throw Error('Unexpected undefined status at end of stream');
265*61c4878aSAndroid Build Coastguard Worker    }
266*61c4878aSAndroid Build Coastguard Worker    if (this.responses.length !== 1) {
267*61c4878aSAndroid Build Coastguard Worker      throw Error(`Unexpected number of responses: ${this.responses.length}`);
268*61c4878aSAndroid Build Coastguard Worker    }
269*61c4878aSAndroid Build Coastguard Worker    return [this.status!, this.responses.last()!];
270*61c4878aSAndroid Build Coastguard Worker  }
271*61c4878aSAndroid Build Coastguard Worker
272*61c4878aSAndroid Build Coastguard Worker  protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> {
273*61c4878aSAndroid Build Coastguard Worker    for await (const response of this.getResponses(undefined, timeoutMs)) {
274*61c4878aSAndroid Build Coastguard Worker      // Do nothing.
275*61c4878aSAndroid Build Coastguard Worker    }
276*61c4878aSAndroid Build Coastguard Worker    if (this.status === undefined) {
277*61c4878aSAndroid Build Coastguard Worker      throw Error('Unexpected undefined status at end of stream');
278*61c4878aSAndroid Build Coastguard Worker    }
279*61c4878aSAndroid Build Coastguard Worker    return [this.status!, this.responses.getAll()];
280*61c4878aSAndroid Build Coastguard Worker  }
281*61c4878aSAndroid Build Coastguard Worker
282*61c4878aSAndroid Build Coastguard Worker  protected sendClientStream(request: Message) {
283*61c4878aSAndroid Build Coastguard Worker    this.checkErrors();
284*61c4878aSAndroid Build Coastguard Worker    if (this.status !== undefined) {
285*61c4878aSAndroid Build Coastguard Worker      throw new RpcError(this.rpc, Status.FAILED_PRECONDITION);
286*61c4878aSAndroid Build Coastguard Worker    }
287*61c4878aSAndroid Build Coastguard Worker    this.rpcs.sendClientStream(this.rpc, request, this.callId);
288*61c4878aSAndroid Build Coastguard Worker  }
289*61c4878aSAndroid Build Coastguard Worker
290*61c4878aSAndroid Build Coastguard Worker  protected finishClientStream(requests: Message[]) {
291*61c4878aSAndroid Build Coastguard Worker    for (const request of requests) {
292*61c4878aSAndroid Build Coastguard Worker      this.sendClientStream(request);
293*61c4878aSAndroid Build Coastguard Worker    }
294*61c4878aSAndroid Build Coastguard Worker
295*61c4878aSAndroid Build Coastguard Worker    if (!this.completed) {
296*61c4878aSAndroid Build Coastguard Worker      this.rpcs.sendClientStreamEnd(this.rpc, this.callId);
297*61c4878aSAndroid Build Coastguard Worker    }
298*61c4878aSAndroid Build Coastguard Worker  }
299*61c4878aSAndroid Build Coastguard Worker}
300*61c4878aSAndroid Build Coastguard Worker
301*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a unary RPC call. */
302*61c4878aSAndroid Build Coastguard Workerexport class UnaryCall extends Call {
303*61c4878aSAndroid Build Coastguard Worker  /** Awaits the server response */
304*61c4878aSAndroid Build Coastguard Worker  async complete(timeoutMs?: number): Promise<[Status, Message]> {
305*61c4878aSAndroid Build Coastguard Worker    return await this.unaryWait(timeoutMs);
306*61c4878aSAndroid Build Coastguard Worker  }
307*61c4878aSAndroid Build Coastguard Worker}
308*61c4878aSAndroid Build Coastguard Worker
309*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a client streaming RPC call. */
310*61c4878aSAndroid Build Coastguard Workerexport class ClientStreamingCall extends Call {
311*61c4878aSAndroid Build Coastguard Worker  /** Gets the last server message, if it exists */
312*61c4878aSAndroid Build Coastguard Worker  get response(): Message | undefined {
313*61c4878aSAndroid Build Coastguard Worker    return this.responses.last();
314*61c4878aSAndroid Build Coastguard Worker  }
315*61c4878aSAndroid Build Coastguard Worker
316*61c4878aSAndroid Build Coastguard Worker  /** Sends a message from the client. */
317*61c4878aSAndroid Build Coastguard Worker  send(request: Message) {
318*61c4878aSAndroid Build Coastguard Worker    this.sendClientStream(request);
319*61c4878aSAndroid Build Coastguard Worker  }
320*61c4878aSAndroid Build Coastguard Worker
321*61c4878aSAndroid Build Coastguard Worker  /** Ends the client stream and waits for the RPC to complete. */
322*61c4878aSAndroid Build Coastguard Worker  async finishAndWait(
323*61c4878aSAndroid Build Coastguard Worker    requests: Message[] = [],
324*61c4878aSAndroid Build Coastguard Worker    timeoutMs?: number,
325*61c4878aSAndroid Build Coastguard Worker  ): Promise<[Status, Message]> {
326*61c4878aSAndroid Build Coastguard Worker    this.finishClientStream(requests);
327*61c4878aSAndroid Build Coastguard Worker    return await this.unaryWait(timeoutMs);
328*61c4878aSAndroid Build Coastguard Worker  }
329*61c4878aSAndroid Build Coastguard Worker}
330*61c4878aSAndroid Build Coastguard Worker
331*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a server streaming RPC call. */
332*61c4878aSAndroid Build Coastguard Workerexport class ServerStreamingCall extends Call {
333*61c4878aSAndroid Build Coastguard Worker  complete(timeoutMs?: number): Promise<[Status, Message[]]> {
334*61c4878aSAndroid Build Coastguard Worker    return this.streamWait(timeoutMs);
335*61c4878aSAndroid Build Coastguard Worker  }
336*61c4878aSAndroid Build Coastguard Worker}
337*61c4878aSAndroid Build Coastguard Worker
338*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a bidirectional streaming RPC call. */
339*61c4878aSAndroid Build Coastguard Workerexport class BidirectionalStreamingCall extends Call {
340*61c4878aSAndroid Build Coastguard Worker  /** Sends a message from the client. */
341*61c4878aSAndroid Build Coastguard Worker  send(request: Message) {
342*61c4878aSAndroid Build Coastguard Worker    this.sendClientStream(request);
343*61c4878aSAndroid Build Coastguard Worker  }
344*61c4878aSAndroid Build Coastguard Worker
345*61c4878aSAndroid Build Coastguard Worker  /** Ends the client stream and waits for the RPC to complete. */
346*61c4878aSAndroid Build Coastguard Worker  async finishAndWait(
347*61c4878aSAndroid Build Coastguard Worker    requests: Array<Message> = [],
348*61c4878aSAndroid Build Coastguard Worker    timeoutMs?: number,
349*61c4878aSAndroid Build Coastguard Worker  ): Promise<[Status, Array<Message>]> {
350*61c4878aSAndroid Build Coastguard Worker    this.finishClientStream(requests);
351*61c4878aSAndroid Build Coastguard Worker    return await this.streamWait(timeoutMs);
352*61c4878aSAndroid Build Coastguard Worker  }
353*61c4878aSAndroid Build Coastguard Worker}
354