1*61c4878aSAndroid Build Coastguard Worker// Copyright 2021 The Pigweed Authors 2*61c4878aSAndroid Build Coastguard Worker// 3*61c4878aSAndroid Build Coastguard Worker// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4*61c4878aSAndroid Build Coastguard Worker// use this file except in compliance with the License. You may obtain a copy of 5*61c4878aSAndroid Build Coastguard Worker// the License at 6*61c4878aSAndroid Build Coastguard Worker// 7*61c4878aSAndroid Build Coastguard Worker// https://www.apache.org/licenses/LICENSE-2.0 8*61c4878aSAndroid Build Coastguard Worker// 9*61c4878aSAndroid Build Coastguard Worker// Unless required by applicable law or agreed to in writing, software 10*61c4878aSAndroid Build Coastguard Worker// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11*61c4878aSAndroid Build Coastguard Worker// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12*61c4878aSAndroid Build Coastguard Worker// License for the specific language governing permissions and limitations under 13*61c4878aSAndroid Build Coastguard Worker// the License. 14*61c4878aSAndroid Build Coastguard Worker 15*61c4878aSAndroid Build Coastguard Workerimport { Status } from 'pigweedjs/pw_status'; 16*61c4878aSAndroid Build Coastguard Workerimport { Message } from 'google-protobuf'; 17*61c4878aSAndroid Build Coastguard Worker 18*61c4878aSAndroid Build Coastguard Workerimport WaitQueue from './queue'; 19*61c4878aSAndroid Build Coastguard Worker 20*61c4878aSAndroid Build Coastguard Workerimport { PendingCalls, Rpc } from './rpc_classes'; 21*61c4878aSAndroid Build Coastguard Worker 22*61c4878aSAndroid Build Coastguard Workerexport type Callback = (a: any) => any; 23*61c4878aSAndroid Build Coastguard Worker 24*61c4878aSAndroid Build Coastguard Workerclass RpcError extends Error { 25*61c4878aSAndroid Build Coastguard Worker status: Status; 26*61c4878aSAndroid Build Coastguard Worker 27*61c4878aSAndroid Build Coastguard Worker constructor(rpc: Rpc, status: Status) { 28*61c4878aSAndroid Build Coastguard Worker let message = ''; 29*61c4878aSAndroid Build Coastguard Worker if (status === Status.NOT_FOUND) { 30*61c4878aSAndroid Build Coastguard Worker message = ': the RPC server does not support this RPC'; 31*61c4878aSAndroid Build Coastguard Worker } else if (status === Status.DATA_LOSS) { 32*61c4878aSAndroid Build Coastguard Worker message = ': an error occurred while decoding the RPC payload'; 33*61c4878aSAndroid Build Coastguard Worker } 34*61c4878aSAndroid Build Coastguard Worker 35*61c4878aSAndroid Build Coastguard Worker super(`${rpc.method.name} failed with error ${Status[status]}${message}`); 36*61c4878aSAndroid Build Coastguard Worker this.status = status; 37*61c4878aSAndroid Build Coastguard Worker } 38*61c4878aSAndroid Build Coastguard Worker} 39*61c4878aSAndroid Build Coastguard Worker 40*61c4878aSAndroid Build Coastguard Workerclass RpcTimeout extends Error { 41*61c4878aSAndroid Build Coastguard Worker readonly rpc: Rpc; 42*61c4878aSAndroid Build Coastguard Worker readonly timeoutMs: number; 43*61c4878aSAndroid Build Coastguard Worker 44*61c4878aSAndroid Build Coastguard Worker constructor(rpc: Rpc, timeoutMs: number) { 45*61c4878aSAndroid Build Coastguard Worker super(`${rpc.method.name} timed out after ${timeoutMs} ms`); 46*61c4878aSAndroid Build Coastguard Worker this.rpc = rpc; 47*61c4878aSAndroid Build Coastguard Worker this.timeoutMs = timeoutMs; 48*61c4878aSAndroid Build Coastguard Worker } 49*61c4878aSAndroid Build Coastguard Worker} 50*61c4878aSAndroid Build Coastguard Worker 51*61c4878aSAndroid Build Coastguard Workerclass Responses { 52*61c4878aSAndroid Build Coastguard Worker private responses: Message[] = []; 53*61c4878aSAndroid Build Coastguard Worker private totalResponses = 0; 54*61c4878aSAndroid Build Coastguard Worker private readonly maxResponses: number; 55*61c4878aSAndroid Build Coastguard Worker 56*61c4878aSAndroid Build Coastguard Worker constructor(maxResponses: number) { 57*61c4878aSAndroid Build Coastguard Worker this.maxResponses = maxResponses; 58*61c4878aSAndroid Build Coastguard Worker } 59*61c4878aSAndroid Build Coastguard Worker 60*61c4878aSAndroid Build Coastguard Worker get length(): number { 61*61c4878aSAndroid Build Coastguard Worker return Math.min(this.totalResponses, this.maxResponses); 62*61c4878aSAndroid Build Coastguard Worker } 63*61c4878aSAndroid Build Coastguard Worker 64*61c4878aSAndroid Build Coastguard Worker push(response: Message): void { 65*61c4878aSAndroid Build Coastguard Worker this.responses[this.totalResponses % this.maxResponses] = response; 66*61c4878aSAndroid Build Coastguard Worker this.totalResponses += 1; 67*61c4878aSAndroid Build Coastguard Worker } 68*61c4878aSAndroid Build Coastguard Worker 69*61c4878aSAndroid Build Coastguard Worker last(): Message | undefined { 70*61c4878aSAndroid Build Coastguard Worker if (this.totalResponses === 0) { 71*61c4878aSAndroid Build Coastguard Worker return undefined; 72*61c4878aSAndroid Build Coastguard Worker } 73*61c4878aSAndroid Build Coastguard Worker 74*61c4878aSAndroid Build Coastguard Worker const lastIndex = (this.totalResponses - 1) % this.maxResponses; 75*61c4878aSAndroid Build Coastguard Worker return this.responses[lastIndex]; 76*61c4878aSAndroid Build Coastguard Worker } 77*61c4878aSAndroid Build Coastguard Worker 78*61c4878aSAndroid Build Coastguard Worker getAll(): Message[] { 79*61c4878aSAndroid Build Coastguard Worker if (this.totalResponses < this.maxResponses) { 80*61c4878aSAndroid Build Coastguard Worker return this.responses.slice(0, this.totalResponses); 81*61c4878aSAndroid Build Coastguard Worker } 82*61c4878aSAndroid Build Coastguard Worker 83*61c4878aSAndroid Build Coastguard Worker const splitIndex = this.totalResponses % this.maxResponses; 84*61c4878aSAndroid Build Coastguard Worker return this.responses 85*61c4878aSAndroid Build Coastguard Worker .slice(splitIndex) 86*61c4878aSAndroid Build Coastguard Worker .concat(this.responses.slice(0, splitIndex)); 87*61c4878aSAndroid Build Coastguard Worker } 88*61c4878aSAndroid Build Coastguard Worker} 89*61c4878aSAndroid Build Coastguard Worker 90*61c4878aSAndroid Build Coastguard Worker/** Represent an in-progress or completed RPC call. */ 91*61c4878aSAndroid Build Coastguard Workerexport class Call { 92*61c4878aSAndroid Build Coastguard Worker // Responses ordered by arrival time. Undefined signifies stream completion. 93*61c4878aSAndroid Build Coastguard Worker private responseQueue = new WaitQueue<Message | undefined>(); 94*61c4878aSAndroid Build Coastguard Worker protected responses: Responses; 95*61c4878aSAndroid Build Coastguard Worker 96*61c4878aSAndroid Build Coastguard Worker private rpcs: PendingCalls; 97*61c4878aSAndroid Build Coastguard Worker rpc: Rpc; 98*61c4878aSAndroid Build Coastguard Worker readonly callId: number; 99*61c4878aSAndroid Build Coastguard Worker 100*61c4878aSAndroid Build Coastguard Worker private onNext: Callback; 101*61c4878aSAndroid Build Coastguard Worker private onCompleted: Callback; 102*61c4878aSAndroid Build Coastguard Worker private onError: Callback; 103*61c4878aSAndroid Build Coastguard Worker 104*61c4878aSAndroid Build Coastguard Worker status?: Status; 105*61c4878aSAndroid Build Coastguard Worker error?: Status; 106*61c4878aSAndroid Build Coastguard Worker callbackException?: Error; 107*61c4878aSAndroid Build Coastguard Worker 108*61c4878aSAndroid Build Coastguard Worker constructor( 109*61c4878aSAndroid Build Coastguard Worker rpcs: PendingCalls, 110*61c4878aSAndroid Build Coastguard Worker rpc: Rpc, 111*61c4878aSAndroid Build Coastguard Worker onNext: Callback, 112*61c4878aSAndroid Build Coastguard Worker onCompleted: Callback, 113*61c4878aSAndroid Build Coastguard Worker onError: Callback, 114*61c4878aSAndroid Build Coastguard Worker maxResponses: number, 115*61c4878aSAndroid Build Coastguard Worker ) { 116*61c4878aSAndroid Build Coastguard Worker this.rpcs = rpcs; 117*61c4878aSAndroid Build Coastguard Worker this.rpc = rpc; 118*61c4878aSAndroid Build Coastguard Worker this.responses = new Responses(maxResponses); 119*61c4878aSAndroid Build Coastguard Worker 120*61c4878aSAndroid Build Coastguard Worker this.onNext = onNext; 121*61c4878aSAndroid Build Coastguard Worker this.onCompleted = onCompleted; 122*61c4878aSAndroid Build Coastguard Worker this.onError = onError; 123*61c4878aSAndroid Build Coastguard Worker this.callId = rpcs.allocateCallId(); 124*61c4878aSAndroid Build Coastguard Worker } 125*61c4878aSAndroid Build Coastguard Worker 126*61c4878aSAndroid Build Coastguard Worker /* Calls the RPC. This must be called immediately after construction. */ 127*61c4878aSAndroid Build Coastguard Worker invoke(request?: Message, ignoreErrors = false): void { 128*61c4878aSAndroid Build Coastguard Worker const previous = this.rpcs.sendRequest( 129*61c4878aSAndroid Build Coastguard Worker this.rpc, 130*61c4878aSAndroid Build Coastguard Worker this, 131*61c4878aSAndroid Build Coastguard Worker ignoreErrors, 132*61c4878aSAndroid Build Coastguard Worker request, 133*61c4878aSAndroid Build Coastguard Worker ); 134*61c4878aSAndroid Build Coastguard Worker 135*61c4878aSAndroid Build Coastguard Worker if (previous !== undefined && !previous.completed) { 136*61c4878aSAndroid Build Coastguard Worker previous.handleError(Status.CANCELLED); 137*61c4878aSAndroid Build Coastguard Worker } 138*61c4878aSAndroid Build Coastguard Worker } 139*61c4878aSAndroid Build Coastguard Worker 140*61c4878aSAndroid Build Coastguard Worker get completed(): boolean { 141*61c4878aSAndroid Build Coastguard Worker return this.status !== undefined || this.error !== undefined; 142*61c4878aSAndroid Build Coastguard Worker } 143*61c4878aSAndroid Build Coastguard Worker 144*61c4878aSAndroid Build Coastguard Worker // eslint-disable-next-line @typescript-eslint/ban-types 145*61c4878aSAndroid Build Coastguard Worker private invokeCallback(func: () => {}) { 146*61c4878aSAndroid Build Coastguard Worker try { 147*61c4878aSAndroid Build Coastguard Worker func(); 148*61c4878aSAndroid Build Coastguard Worker } catch (err: unknown) { 149*61c4878aSAndroid Build Coastguard Worker if (err instanceof Error) { 150*61c4878aSAndroid Build Coastguard Worker console.error( 151*61c4878aSAndroid Build Coastguard Worker `An exception was raised while invoking a callback: ${err}`, 152*61c4878aSAndroid Build Coastguard Worker ); 153*61c4878aSAndroid Build Coastguard Worker this.callbackException = err; 154*61c4878aSAndroid Build Coastguard Worker } 155*61c4878aSAndroid Build Coastguard Worker console.error(`Unexpected item thrown while invoking callback: ${err}`); 156*61c4878aSAndroid Build Coastguard Worker } 157*61c4878aSAndroid Build Coastguard Worker } 158*61c4878aSAndroid Build Coastguard Worker 159*61c4878aSAndroid Build Coastguard Worker handleResponse(response: Message): void { 160*61c4878aSAndroid Build Coastguard Worker this.responses.push(response); 161*61c4878aSAndroid Build Coastguard Worker this.responseQueue.push(response); 162*61c4878aSAndroid Build Coastguard Worker this.invokeCallback(() => this.onNext(response)); 163*61c4878aSAndroid Build Coastguard Worker } 164*61c4878aSAndroid Build Coastguard Worker 165*61c4878aSAndroid Build Coastguard Worker handleCompletion(status: Status) { 166*61c4878aSAndroid Build Coastguard Worker this.status = status; 167*61c4878aSAndroid Build Coastguard Worker this.responseQueue.push(undefined); 168*61c4878aSAndroid Build Coastguard Worker this.invokeCallback(() => this.onCompleted(status)); 169*61c4878aSAndroid Build Coastguard Worker } 170*61c4878aSAndroid Build Coastguard Worker 171*61c4878aSAndroid Build Coastguard Worker handleError(error: Status): void { 172*61c4878aSAndroid Build Coastguard Worker this.error = error; 173*61c4878aSAndroid Build Coastguard Worker this.responseQueue.push(undefined); 174*61c4878aSAndroid Build Coastguard Worker this.invokeCallback(() => this.onError(error)); 175*61c4878aSAndroid Build Coastguard Worker } 176*61c4878aSAndroid Build Coastguard Worker 177*61c4878aSAndroid Build Coastguard Worker private async queuePopWithTimeout( 178*61c4878aSAndroid Build Coastguard Worker timeoutMs: number, 179*61c4878aSAndroid Build Coastguard Worker ): Promise<Message | undefined> { 180*61c4878aSAndroid Build Coastguard Worker // eslint-disable-next-line no-async-promise-executor 181*61c4878aSAndroid Build Coastguard Worker return new Promise(async (resolve, reject) => { 182*61c4878aSAndroid Build Coastguard Worker let timeoutExpired = false; 183*61c4878aSAndroid Build Coastguard Worker const timeoutWatcher = setTimeout(() => { 184*61c4878aSAndroid Build Coastguard Worker timeoutExpired = true; 185*61c4878aSAndroid Build Coastguard Worker reject(new RpcTimeout(this.rpc, timeoutMs)); 186*61c4878aSAndroid Build Coastguard Worker }, timeoutMs); 187*61c4878aSAndroid Build Coastguard Worker const response = await this.responseQueue.shift(); 188*61c4878aSAndroid Build Coastguard Worker if (timeoutExpired) { 189*61c4878aSAndroid Build Coastguard Worker this.responseQueue.unshift(response); 190*61c4878aSAndroid Build Coastguard Worker return; 191*61c4878aSAndroid Build Coastguard Worker } 192*61c4878aSAndroid Build Coastguard Worker clearTimeout(timeoutWatcher); 193*61c4878aSAndroid Build Coastguard Worker resolve(response); 194*61c4878aSAndroid Build Coastguard Worker }); 195*61c4878aSAndroid Build Coastguard Worker } 196*61c4878aSAndroid Build Coastguard Worker 197*61c4878aSAndroid Build Coastguard Worker /** 198*61c4878aSAndroid Build Coastguard Worker * Yields responses up the specified count as they are added. 199*61c4878aSAndroid Build Coastguard Worker * 200*61c4878aSAndroid Build Coastguard Worker * Throws an error as soon as it is received even if there are still 201*61c4878aSAndroid Build Coastguard Worker * responses in the queue. 202*61c4878aSAndroid Build Coastguard Worker * 203*61c4878aSAndroid Build Coastguard Worker * Usage 204*61c4878aSAndroid Build Coastguard Worker * ``` 205*61c4878aSAndroid Build Coastguard Worker * for await (const response of call.getResponses(5)) { 206*61c4878aSAndroid Build Coastguard Worker * console.log(response); 207*61c4878aSAndroid Build Coastguard Worker * } 208*61c4878aSAndroid Build Coastguard Worker * ``` 209*61c4878aSAndroid Build Coastguard Worker * 210*61c4878aSAndroid Build Coastguard Worker * @param {number} count The number of responses to read before returning. 211*61c4878aSAndroid Build Coastguard Worker * If no value is specified, getResponses will block until the stream 212*61c4878aSAndroid Build Coastguard Worker * either ends or hits an error. 213*61c4878aSAndroid Build Coastguard Worker * @param {number} timeout The number of milliseconds to wait for a response 214*61c4878aSAndroid Build Coastguard Worker * before throwing an error. 215*61c4878aSAndroid Build Coastguard Worker */ 216*61c4878aSAndroid Build Coastguard Worker async *getResponses( 217*61c4878aSAndroid Build Coastguard Worker count?: number, 218*61c4878aSAndroid Build Coastguard Worker timeoutMs?: number, 219*61c4878aSAndroid Build Coastguard Worker ): AsyncGenerator<Message> { 220*61c4878aSAndroid Build Coastguard Worker this.checkErrors(); 221*61c4878aSAndroid Build Coastguard Worker 222*61c4878aSAndroid Build Coastguard Worker if (this.completed && this.responseQueue.length == 0) { 223*61c4878aSAndroid Build Coastguard Worker return; 224*61c4878aSAndroid Build Coastguard Worker } 225*61c4878aSAndroid Build Coastguard Worker 226*61c4878aSAndroid Build Coastguard Worker let remaining = count ?? Number.POSITIVE_INFINITY; 227*61c4878aSAndroid Build Coastguard Worker while (remaining > 0) { 228*61c4878aSAndroid Build Coastguard Worker const response = 229*61c4878aSAndroid Build Coastguard Worker timeoutMs === undefined 230*61c4878aSAndroid Build Coastguard Worker ? await this.responseQueue.shift() 231*61c4878aSAndroid Build Coastguard Worker : await this.queuePopWithTimeout(timeoutMs!); 232*61c4878aSAndroid Build Coastguard Worker this.checkErrors(); 233*61c4878aSAndroid Build Coastguard Worker if (response === undefined) { 234*61c4878aSAndroid Build Coastguard Worker return; 235*61c4878aSAndroid Build Coastguard Worker } 236*61c4878aSAndroid Build Coastguard Worker yield response!; 237*61c4878aSAndroid Build Coastguard Worker remaining -= 1; 238*61c4878aSAndroid Build Coastguard Worker } 239*61c4878aSAndroid Build Coastguard Worker } 240*61c4878aSAndroid Build Coastguard Worker 241*61c4878aSAndroid Build Coastguard Worker cancel(): boolean { 242*61c4878aSAndroid Build Coastguard Worker if (this.completed) { 243*61c4878aSAndroid Build Coastguard Worker return false; 244*61c4878aSAndroid Build Coastguard Worker } 245*61c4878aSAndroid Build Coastguard Worker 246*61c4878aSAndroid Build Coastguard Worker this.error = Status.CANCELLED; 247*61c4878aSAndroid Build Coastguard Worker return this.rpcs.sendCancel(this.rpc, this.callId); 248*61c4878aSAndroid Build Coastguard Worker } 249*61c4878aSAndroid Build Coastguard Worker 250*61c4878aSAndroid Build Coastguard Worker private checkErrors(): void { 251*61c4878aSAndroid Build Coastguard Worker if (this.callbackException !== undefined) { 252*61c4878aSAndroid Build Coastguard Worker throw this.callbackException; 253*61c4878aSAndroid Build Coastguard Worker } 254*61c4878aSAndroid Build Coastguard Worker if (this.error !== undefined) { 255*61c4878aSAndroid Build Coastguard Worker throw new RpcError(this.rpc, this.error); 256*61c4878aSAndroid Build Coastguard Worker } 257*61c4878aSAndroid Build Coastguard Worker } 258*61c4878aSAndroid Build Coastguard Worker 259*61c4878aSAndroid Build Coastguard Worker protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> { 260*61c4878aSAndroid Build Coastguard Worker for await (const response of this.getResponses(1, timeoutMs)) { 261*61c4878aSAndroid Build Coastguard Worker // Do nothing. 262*61c4878aSAndroid Build Coastguard Worker } 263*61c4878aSAndroid Build Coastguard Worker if (this.status === undefined) { 264*61c4878aSAndroid Build Coastguard Worker throw Error('Unexpected undefined status at end of stream'); 265*61c4878aSAndroid Build Coastguard Worker } 266*61c4878aSAndroid Build Coastguard Worker if (this.responses.length !== 1) { 267*61c4878aSAndroid Build Coastguard Worker throw Error(`Unexpected number of responses: ${this.responses.length}`); 268*61c4878aSAndroid Build Coastguard Worker } 269*61c4878aSAndroid Build Coastguard Worker return [this.status!, this.responses.last()!]; 270*61c4878aSAndroid Build Coastguard Worker } 271*61c4878aSAndroid Build Coastguard Worker 272*61c4878aSAndroid Build Coastguard Worker protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> { 273*61c4878aSAndroid Build Coastguard Worker for await (const response of this.getResponses(undefined, timeoutMs)) { 274*61c4878aSAndroid Build Coastguard Worker // Do nothing. 275*61c4878aSAndroid Build Coastguard Worker } 276*61c4878aSAndroid Build Coastguard Worker if (this.status === undefined) { 277*61c4878aSAndroid Build Coastguard Worker throw Error('Unexpected undefined status at end of stream'); 278*61c4878aSAndroid Build Coastguard Worker } 279*61c4878aSAndroid Build Coastguard Worker return [this.status!, this.responses.getAll()]; 280*61c4878aSAndroid Build Coastguard Worker } 281*61c4878aSAndroid Build Coastguard Worker 282*61c4878aSAndroid Build Coastguard Worker protected sendClientStream(request: Message) { 283*61c4878aSAndroid Build Coastguard Worker this.checkErrors(); 284*61c4878aSAndroid Build Coastguard Worker if (this.status !== undefined) { 285*61c4878aSAndroid Build Coastguard Worker throw new RpcError(this.rpc, Status.FAILED_PRECONDITION); 286*61c4878aSAndroid Build Coastguard Worker } 287*61c4878aSAndroid Build Coastguard Worker this.rpcs.sendClientStream(this.rpc, request, this.callId); 288*61c4878aSAndroid Build Coastguard Worker } 289*61c4878aSAndroid Build Coastguard Worker 290*61c4878aSAndroid Build Coastguard Worker protected finishClientStream(requests: Message[]) { 291*61c4878aSAndroid Build Coastguard Worker for (const request of requests) { 292*61c4878aSAndroid Build Coastguard Worker this.sendClientStream(request); 293*61c4878aSAndroid Build Coastguard Worker } 294*61c4878aSAndroid Build Coastguard Worker 295*61c4878aSAndroid Build Coastguard Worker if (!this.completed) { 296*61c4878aSAndroid Build Coastguard Worker this.rpcs.sendClientStreamEnd(this.rpc, this.callId); 297*61c4878aSAndroid Build Coastguard Worker } 298*61c4878aSAndroid Build Coastguard Worker } 299*61c4878aSAndroid Build Coastguard Worker} 300*61c4878aSAndroid Build Coastguard Worker 301*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a unary RPC call. */ 302*61c4878aSAndroid Build Coastguard Workerexport class UnaryCall extends Call { 303*61c4878aSAndroid Build Coastguard Worker /** Awaits the server response */ 304*61c4878aSAndroid Build Coastguard Worker async complete(timeoutMs?: number): Promise<[Status, Message]> { 305*61c4878aSAndroid Build Coastguard Worker return await this.unaryWait(timeoutMs); 306*61c4878aSAndroid Build Coastguard Worker } 307*61c4878aSAndroid Build Coastguard Worker} 308*61c4878aSAndroid Build Coastguard Worker 309*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a client streaming RPC call. */ 310*61c4878aSAndroid Build Coastguard Workerexport class ClientStreamingCall extends Call { 311*61c4878aSAndroid Build Coastguard Worker /** Gets the last server message, if it exists */ 312*61c4878aSAndroid Build Coastguard Worker get response(): Message | undefined { 313*61c4878aSAndroid Build Coastguard Worker return this.responses.last(); 314*61c4878aSAndroid Build Coastguard Worker } 315*61c4878aSAndroid Build Coastguard Worker 316*61c4878aSAndroid Build Coastguard Worker /** Sends a message from the client. */ 317*61c4878aSAndroid Build Coastguard Worker send(request: Message) { 318*61c4878aSAndroid Build Coastguard Worker this.sendClientStream(request); 319*61c4878aSAndroid Build Coastguard Worker } 320*61c4878aSAndroid Build Coastguard Worker 321*61c4878aSAndroid Build Coastguard Worker /** Ends the client stream and waits for the RPC to complete. */ 322*61c4878aSAndroid Build Coastguard Worker async finishAndWait( 323*61c4878aSAndroid Build Coastguard Worker requests: Message[] = [], 324*61c4878aSAndroid Build Coastguard Worker timeoutMs?: number, 325*61c4878aSAndroid Build Coastguard Worker ): Promise<[Status, Message]> { 326*61c4878aSAndroid Build Coastguard Worker this.finishClientStream(requests); 327*61c4878aSAndroid Build Coastguard Worker return await this.unaryWait(timeoutMs); 328*61c4878aSAndroid Build Coastguard Worker } 329*61c4878aSAndroid Build Coastguard Worker} 330*61c4878aSAndroid Build Coastguard Worker 331*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a server streaming RPC call. */ 332*61c4878aSAndroid Build Coastguard Workerexport class ServerStreamingCall extends Call { 333*61c4878aSAndroid Build Coastguard Worker complete(timeoutMs?: number): Promise<[Status, Message[]]> { 334*61c4878aSAndroid Build Coastguard Worker return this.streamWait(timeoutMs); 335*61c4878aSAndroid Build Coastguard Worker } 336*61c4878aSAndroid Build Coastguard Worker} 337*61c4878aSAndroid Build Coastguard Worker 338*61c4878aSAndroid Build Coastguard Worker/** Tracks the state of a bidirectional streaming RPC call. */ 339*61c4878aSAndroid Build Coastguard Workerexport class BidirectionalStreamingCall extends Call { 340*61c4878aSAndroid Build Coastguard Worker /** Sends a message from the client. */ 341*61c4878aSAndroid Build Coastguard Worker send(request: Message) { 342*61c4878aSAndroid Build Coastguard Worker this.sendClientStream(request); 343*61c4878aSAndroid Build Coastguard Worker } 344*61c4878aSAndroid Build Coastguard Worker 345*61c4878aSAndroid Build Coastguard Worker /** Ends the client stream and waits for the RPC to complete. */ 346*61c4878aSAndroid Build Coastguard Worker async finishAndWait( 347*61c4878aSAndroid Build Coastguard Worker requests: Array<Message> = [], 348*61c4878aSAndroid Build Coastguard Worker timeoutMs?: number, 349*61c4878aSAndroid Build Coastguard Worker ): Promise<[Status, Array<Message>]> { 350*61c4878aSAndroid Build Coastguard Worker this.finishClientStream(requests); 351*61c4878aSAndroid Build Coastguard Worker return await this.streamWait(timeoutMs); 352*61c4878aSAndroid Build Coastguard Worker } 353*61c4878aSAndroid Build Coastguard Worker} 354