1*61c4878aSAndroid Build Coastguard Worker// Copyright 2022 The Pigweed Authors 2*61c4878aSAndroid Build Coastguard Worker// 3*61c4878aSAndroid Build Coastguard Worker// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4*61c4878aSAndroid Build Coastguard Worker// use this file except in compliance with the License. You may obtain a copy of 5*61c4878aSAndroid Build Coastguard Worker// the License at 6*61c4878aSAndroid Build Coastguard Worker// 7*61c4878aSAndroid Build Coastguard Worker// https://www.apache.org/licenses/LICENSE-2.0 8*61c4878aSAndroid Build Coastguard Worker// 9*61c4878aSAndroid Build Coastguard Worker// Unless required by applicable law or agreed to in writing, software 10*61c4878aSAndroid Build Coastguard Worker// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11*61c4878aSAndroid Build Coastguard Worker// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12*61c4878aSAndroid Build Coastguard Worker// License for the specific language governing permissions and limitations under 13*61c4878aSAndroid Build Coastguard Worker// the License. 14*61c4878aSAndroid Build Coastguard Worker 15*61c4878aSAndroid Build Coastguard Worker/** Client for the pw_transfer service, which transmits data over pw_rpc. */ 16*61c4878aSAndroid Build Coastguard Worker 17*61c4878aSAndroid Build Coastguard Workerimport { 18*61c4878aSAndroid Build Coastguard Worker BidirectionalStreamingCall, 19*61c4878aSAndroid Build Coastguard Worker BidirectionalStreamingMethodStub, 20*61c4878aSAndroid Build Coastguard Worker ServiceClient, 21*61c4878aSAndroid Build Coastguard Worker} from 'pigweedjs/pw_rpc'; 22*61c4878aSAndroid Build Coastguard Workerimport { Status } from 'pigweedjs/pw_status'; 23*61c4878aSAndroid Build Coastguard Workerimport { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb'; 24*61c4878aSAndroid Build Coastguard Worker 25*61c4878aSAndroid Build Coastguard Workerimport { 26*61c4878aSAndroid Build Coastguard Worker ReadTransfer, 27*61c4878aSAndroid Build Coastguard Worker ProgressCallback, 28*61c4878aSAndroid Build Coastguard Worker Transfer, 29*61c4878aSAndroid Build Coastguard Worker WriteTransfer, 30*61c4878aSAndroid Build Coastguard Worker} from './transfer'; 31*61c4878aSAndroid Build Coastguard Worker 32*61c4878aSAndroid Build Coastguard Workertype TransferDict = { 33*61c4878aSAndroid Build Coastguard Worker [key: number]: Transfer; 34*61c4878aSAndroid Build Coastguard Worker}; 35*61c4878aSAndroid Build Coastguard Worker 36*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_MAX_RETRIES = 3; 37*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_RESPONSE_TIMEOUT_S = 2; 38*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4; 39*61c4878aSAndroid Build Coastguard Worker 40*61c4878aSAndroid Build Coastguard Worker/** 41*61c4878aSAndroid Build Coastguard Worker * A manager for transmitting data through an RPC TransferService. 42*61c4878aSAndroid Build Coastguard Worker * 43*61c4878aSAndroid Build Coastguard Worker * This should be initialized with an active Manager over an RPC channel. Only 44*61c4878aSAndroid Build Coastguard Worker * one instance of this class should exist for a configured RPC TransferService 45*61c4878aSAndroid Build Coastguard Worker * -- the Manager supports multiple simultaneous transfers. 46*61c4878aSAndroid Build Coastguard Worker * 47*61c4878aSAndroid Build Coastguard Worker * When created, a Manager starts a separate thread in which transfer 48*61c4878aSAndroid Build Coastguard Worker * communications and events are handled. 49*61c4878aSAndroid Build Coastguard Worker */ 50*61c4878aSAndroid Build Coastguard Workerexport class Manager { 51*61c4878aSAndroid Build Coastguard Worker // Ongoing transfers in the service by ID 52*61c4878aSAndroid Build Coastguard Worker readTransfers: TransferDict = {}; 53*61c4878aSAndroid Build Coastguard Worker writeTransfers: TransferDict = {}; 54*61c4878aSAndroid Build Coastguard Worker 55*61c4878aSAndroid Build Coastguard Worker // RPC streams for read and write transfers. These are shareable by 56*61c4878aSAndroid Build Coastguard Worker // multiple transfers of the same type. 57*61c4878aSAndroid Build Coastguard Worker private readStream?: BidirectionalStreamingCall; 58*61c4878aSAndroid Build Coastguard Worker private writeStream?: BidirectionalStreamingCall; 59*61c4878aSAndroid Build Coastguard Worker 60*61c4878aSAndroid Build Coastguard Worker /** 61*61c4878aSAndroid Build Coastguard Worker * Initializes a Manager on top of a TransferService. 62*61c4878aSAndroid Build Coastguard Worker * 63*61c4878aSAndroid Build Coastguard Worker * Args: 64*61c4878aSAndroid Build Coastguard Worker * @param{ServiceClient} service: the pw_rpc transfer service 65*61c4878aSAndroid Build Coastguard Worker * client 66*61c4878aSAndroid Build Coastguard Worker * @param{number} defaultResponseTimeoutS: max time to wait between receiving 67*61c4878aSAndroid Build Coastguard Worker * packets 68*61c4878aSAndroid Build Coastguard Worker * @param{number} initialResponseTimeoutS: timeout for the first packet; may 69*61c4878aSAndroid Build Coastguard Worker * be longer to account for transfer handler initialization 70*61c4878aSAndroid Build Coastguard Worker * @param{number} maxRetries: number of times to retry after a timeout 71*61c4878aSAndroid Build Coastguard Worker */ 72*61c4878aSAndroid Build Coastguard Worker constructor( 73*61c4878aSAndroid Build Coastguard Worker private service: ServiceClient, 74*61c4878aSAndroid Build Coastguard Worker private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S, 75*61c4878aSAndroid Build Coastguard Worker private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT, 76*61c4878aSAndroid Build Coastguard Worker private maxRetries = DEFAULT_MAX_RETRIES, 77*61c4878aSAndroid Build Coastguard Worker ) {} 78*61c4878aSAndroid Build Coastguard Worker 79*61c4878aSAndroid Build Coastguard Worker /** 80*61c4878aSAndroid Build Coastguard Worker * Receives ("downloads") data from the server. 81*61c4878aSAndroid Build Coastguard Worker * 82*61c4878aSAndroid Build Coastguard Worker * @throws Throws an error when the transfer fails to complete. 83*61c4878aSAndroid Build Coastguard Worker */ 84*61c4878aSAndroid Build Coastguard Worker async read( 85*61c4878aSAndroid Build Coastguard Worker resourceId: number, 86*61c4878aSAndroid Build Coastguard Worker progressCallback?: ProgressCallback, 87*61c4878aSAndroid Build Coastguard Worker ): Promise<Uint8Array> { 88*61c4878aSAndroid Build Coastguard Worker if (resourceId in this.readTransfers) { 89*61c4878aSAndroid Build Coastguard Worker throw new Error( 90*61c4878aSAndroid Build Coastguard Worker `Read transfer for resource ${resourceId} already exists`, 91*61c4878aSAndroid Build Coastguard Worker ); 92*61c4878aSAndroid Build Coastguard Worker } 93*61c4878aSAndroid Build Coastguard Worker const transfer = new ReadTransfer( 94*61c4878aSAndroid Build Coastguard Worker resourceId, 95*61c4878aSAndroid Build Coastguard Worker this.sendReadChunkCallback, 96*61c4878aSAndroid Build Coastguard Worker this.defaultResponseTimeoutS, 97*61c4878aSAndroid Build Coastguard Worker this.maxRetries, 98*61c4878aSAndroid Build Coastguard Worker progressCallback, 99*61c4878aSAndroid Build Coastguard Worker ); 100*61c4878aSAndroid Build Coastguard Worker 101*61c4878aSAndroid Build Coastguard Worker this.startReadTransfer(transfer); 102*61c4878aSAndroid Build Coastguard Worker 103*61c4878aSAndroid Build Coastguard Worker const status = await transfer.done; 104*61c4878aSAndroid Build Coastguard Worker 105*61c4878aSAndroid Build Coastguard Worker delete this.readTransfers[transfer.id]; 106*61c4878aSAndroid Build Coastguard Worker if (status !== Status.OK) { 107*61c4878aSAndroid Build Coastguard Worker throw new TransferError(transfer.id, transfer.status); 108*61c4878aSAndroid Build Coastguard Worker } 109*61c4878aSAndroid Build Coastguard Worker return transfer.data; 110*61c4878aSAndroid Build Coastguard Worker } 111*61c4878aSAndroid Build Coastguard Worker 112*61c4878aSAndroid Build Coastguard Worker /** Begins a new read transfer, opening the stream if it isn't. */ 113*61c4878aSAndroid Build Coastguard Worker startReadTransfer(transfer: Transfer): void { 114*61c4878aSAndroid Build Coastguard Worker this.readTransfers[transfer.id] = transfer; 115*61c4878aSAndroid Build Coastguard Worker 116*61c4878aSAndroid Build Coastguard Worker if (this.readStream === undefined) { 117*61c4878aSAndroid Build Coastguard Worker this.openReadStream(); 118*61c4878aSAndroid Build Coastguard Worker } 119*61c4878aSAndroid Build Coastguard Worker console.debug(`Starting new read transfer ${transfer.id}`); 120*61c4878aSAndroid Build Coastguard Worker transfer.begin(); 121*61c4878aSAndroid Build Coastguard Worker } 122*61c4878aSAndroid Build Coastguard Worker 123*61c4878aSAndroid Build Coastguard Worker /** 124*61c4878aSAndroid Build Coastguard Worker Transmits (uploads) data to the server. 125*61c4878aSAndroid Build Coastguard Worker * 126*61c4878aSAndroid Build Coastguard Worker * @param{number} resourceId: ID of the resource to which to write. 127*61c4878aSAndroid Build Coastguard Worker * @param{Uint8Array} data: Data to send to the server. 128*61c4878aSAndroid Build Coastguard Worker */ 129*61c4878aSAndroid Build Coastguard Worker async write( 130*61c4878aSAndroid Build Coastguard Worker resourceId: number, 131*61c4878aSAndroid Build Coastguard Worker data: Uint8Array, 132*61c4878aSAndroid Build Coastguard Worker progressCallback?: ProgressCallback, 133*61c4878aSAndroid Build Coastguard Worker ): Promise<void> { 134*61c4878aSAndroid Build Coastguard Worker const transfer = new WriteTransfer( 135*61c4878aSAndroid Build Coastguard Worker resourceId, 136*61c4878aSAndroid Build Coastguard Worker data, 137*61c4878aSAndroid Build Coastguard Worker this.sendWriteChunkCallback, 138*61c4878aSAndroid Build Coastguard Worker this.defaultResponseTimeoutS, 139*61c4878aSAndroid Build Coastguard Worker this.initialResponseTimeoutS, 140*61c4878aSAndroid Build Coastguard Worker this.maxRetries, 141*61c4878aSAndroid Build Coastguard Worker progressCallback, 142*61c4878aSAndroid Build Coastguard Worker ); 143*61c4878aSAndroid Build Coastguard Worker this.startWriteTransfer(transfer); 144*61c4878aSAndroid Build Coastguard Worker 145*61c4878aSAndroid Build Coastguard Worker const status = await transfer.done; 146*61c4878aSAndroid Build Coastguard Worker 147*61c4878aSAndroid Build Coastguard Worker delete this.writeTransfers[transfer.id]; 148*61c4878aSAndroid Build Coastguard Worker if (transfer.status !== Status.OK) { 149*61c4878aSAndroid Build Coastguard Worker throw new TransferError(transfer.id, transfer.status); 150*61c4878aSAndroid Build Coastguard Worker } 151*61c4878aSAndroid Build Coastguard Worker } 152*61c4878aSAndroid Build Coastguard Worker 153*61c4878aSAndroid Build Coastguard Worker sendReadChunkCallback = (chunk: Chunk) => { 154*61c4878aSAndroid Build Coastguard Worker this.readStream!.send(chunk); 155*61c4878aSAndroid Build Coastguard Worker }; 156*61c4878aSAndroid Build Coastguard Worker 157*61c4878aSAndroid Build Coastguard Worker sendWriteChunkCallback = (chunk: Chunk) => { 158*61c4878aSAndroid Build Coastguard Worker this.writeStream!.send(chunk); 159*61c4878aSAndroid Build Coastguard Worker }; 160*61c4878aSAndroid Build Coastguard Worker 161*61c4878aSAndroid Build Coastguard Worker /** Begins a new write transfer, opening the stream if it isn't */ 162*61c4878aSAndroid Build Coastguard Worker startWriteTransfer(transfer: Transfer): void { 163*61c4878aSAndroid Build Coastguard Worker this.writeTransfers[transfer.id] = transfer; 164*61c4878aSAndroid Build Coastguard Worker 165*61c4878aSAndroid Build Coastguard Worker if (!this.writeStream) { 166*61c4878aSAndroid Build Coastguard Worker this.openWriteStream(); 167*61c4878aSAndroid Build Coastguard Worker } 168*61c4878aSAndroid Build Coastguard Worker 169*61c4878aSAndroid Build Coastguard Worker console.debug(`Starting new write transfer ${transfer.id}`); 170*61c4878aSAndroid Build Coastguard Worker transfer.begin(); 171*61c4878aSAndroid Build Coastguard Worker } 172*61c4878aSAndroid Build Coastguard Worker 173*61c4878aSAndroid Build Coastguard Worker private openReadStream(): void { 174*61c4878aSAndroid Build Coastguard Worker const readRpc = this.service.method( 175*61c4878aSAndroid Build Coastguard Worker 'Read', 176*61c4878aSAndroid Build Coastguard Worker )! as BidirectionalStreamingMethodStub; 177*61c4878aSAndroid Build Coastguard Worker this.readStream = readRpc.invoke( 178*61c4878aSAndroid Build Coastguard Worker (chunk: Chunk) => { 179*61c4878aSAndroid Build Coastguard Worker this.handleChunk(this.readTransfers, chunk); 180*61c4878aSAndroid Build Coastguard Worker }, 181*61c4878aSAndroid Build Coastguard Worker () => { 182*61c4878aSAndroid Build Coastguard Worker // Do nothing. 183*61c4878aSAndroid Build Coastguard Worker }, 184*61c4878aSAndroid Build Coastguard Worker this.onReadError, 185*61c4878aSAndroid Build Coastguard Worker ); 186*61c4878aSAndroid Build Coastguard Worker } 187*61c4878aSAndroid Build Coastguard Worker 188*61c4878aSAndroid Build Coastguard Worker private openWriteStream(): void { 189*61c4878aSAndroid Build Coastguard Worker const writeRpc = this.service.method( 190*61c4878aSAndroid Build Coastguard Worker 'Write', 191*61c4878aSAndroid Build Coastguard Worker )! as BidirectionalStreamingMethodStub; 192*61c4878aSAndroid Build Coastguard Worker this.writeStream = writeRpc.invoke( 193*61c4878aSAndroid Build Coastguard Worker (chunk: Chunk) => { 194*61c4878aSAndroid Build Coastguard Worker this.handleChunk(this.writeTransfers, chunk); 195*61c4878aSAndroid Build Coastguard Worker }, 196*61c4878aSAndroid Build Coastguard Worker () => { 197*61c4878aSAndroid Build Coastguard Worker // Do nothing. 198*61c4878aSAndroid Build Coastguard Worker }, 199*61c4878aSAndroid Build Coastguard Worker this.onWriteError, 200*61c4878aSAndroid Build Coastguard Worker ); 201*61c4878aSAndroid Build Coastguard Worker } 202*61c4878aSAndroid Build Coastguard Worker 203*61c4878aSAndroid Build Coastguard Worker /** 204*61c4878aSAndroid Build Coastguard Worker * Callback for an RPC error in the read stream. 205*61c4878aSAndroid Build Coastguard Worker */ 206*61c4878aSAndroid Build Coastguard Worker private onReadError = (status: Status) => { 207*61c4878aSAndroid Build Coastguard Worker if (status === Status.FAILED_PRECONDITION) { 208*61c4878aSAndroid Build Coastguard Worker // FAILED_PRECONDITION indicates that the stream packet was not 209*61c4878aSAndroid Build Coastguard Worker // recognized as the stream is not open. This could occur if the 210*61c4878aSAndroid Build Coastguard Worker // server resets during an active transfer. Re-open the stream to 211*61c4878aSAndroid Build Coastguard Worker // allow pending transfers to continue. 212*61c4878aSAndroid Build Coastguard Worker this.openReadStream(); 213*61c4878aSAndroid Build Coastguard Worker return; 214*61c4878aSAndroid Build Coastguard Worker } 215*61c4878aSAndroid Build Coastguard Worker 216*61c4878aSAndroid Build Coastguard Worker // Other errors are unrecoverable. Clear the stream and cancel any 217*61c4878aSAndroid Build Coastguard Worker // pending transfers with an INTERNAL status as this is a system 218*61c4878aSAndroid Build Coastguard Worker // error. 219*61c4878aSAndroid Build Coastguard Worker this.readStream = undefined; 220*61c4878aSAndroid Build Coastguard Worker 221*61c4878aSAndroid Build Coastguard Worker for (const key in this.readTransfers) { 222*61c4878aSAndroid Build Coastguard Worker const transfer = this.readTransfers[key]; 223*61c4878aSAndroid Build Coastguard Worker transfer.abort(Status.INTERNAL); 224*61c4878aSAndroid Build Coastguard Worker } 225*61c4878aSAndroid Build Coastguard Worker this.readTransfers = {}; 226*61c4878aSAndroid Build Coastguard Worker console.error(`Read stream shut down ${Status[status]}`); 227*61c4878aSAndroid Build Coastguard Worker }; 228*61c4878aSAndroid Build Coastguard Worker 229*61c4878aSAndroid Build Coastguard Worker private onWriteError = (status: Status) => { 230*61c4878aSAndroid Build Coastguard Worker if (status === Status.FAILED_PRECONDITION) { 231*61c4878aSAndroid Build Coastguard Worker // FAILED_PRECONDITION indicates that the stream packet was not 232*61c4878aSAndroid Build Coastguard Worker // recognized as the stream is not open. This could occur if the 233*61c4878aSAndroid Build Coastguard Worker // server resets during an active transfer. Re-open the stream to 234*61c4878aSAndroid Build Coastguard Worker // allow pending transfers to continue. 235*61c4878aSAndroid Build Coastguard Worker this.openWriteStream(); 236*61c4878aSAndroid Build Coastguard Worker } else { 237*61c4878aSAndroid Build Coastguard Worker // Other errors are unrecoverable. Clear the stream and cancel any 238*61c4878aSAndroid Build Coastguard Worker // pending transfers with an INTERNAL status as this is a system 239*61c4878aSAndroid Build Coastguard Worker // error. 240*61c4878aSAndroid Build Coastguard Worker this.writeStream = undefined; 241*61c4878aSAndroid Build Coastguard Worker 242*61c4878aSAndroid Build Coastguard Worker for (const key in this.writeTransfers) { 243*61c4878aSAndroid Build Coastguard Worker const transfer = this.writeTransfers[key]; 244*61c4878aSAndroid Build Coastguard Worker transfer.abort(Status.INTERNAL); 245*61c4878aSAndroid Build Coastguard Worker } 246*61c4878aSAndroid Build Coastguard Worker this.writeTransfers = {}; 247*61c4878aSAndroid Build Coastguard Worker console.error(`Write stream shut down: ${Status[status]}`); 248*61c4878aSAndroid Build Coastguard Worker } 249*61c4878aSAndroid Build Coastguard Worker }; 250*61c4878aSAndroid Build Coastguard Worker 251*61c4878aSAndroid Build Coastguard Worker /** 252*61c4878aSAndroid Build Coastguard Worker * Processes an incoming chunk from a stream. 253*61c4878aSAndroid Build Coastguard Worker * 254*61c4878aSAndroid Build Coastguard Worker * The chunk is dispatched to an active transfer based on its ID. If the 255*61c4878aSAndroid Build Coastguard Worker * transfer indicates that it is complete, the provided completion callback 256*61c4878aSAndroid Build Coastguard Worker * is invoked. 257*61c4878aSAndroid Build Coastguard Worker */ 258*61c4878aSAndroid Build Coastguard Worker private async handleChunk(transfers: TransferDict, chunk: Chunk) { 259*61c4878aSAndroid Build Coastguard Worker const transfer = transfers[chunk.getTransferId()]; 260*61c4878aSAndroid Build Coastguard Worker if (transfer === undefined) { 261*61c4878aSAndroid Build Coastguard Worker console.error( 262*61c4878aSAndroid Build Coastguard Worker `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}`, 263*61c4878aSAndroid Build Coastguard Worker ); 264*61c4878aSAndroid Build Coastguard Worker return; 265*61c4878aSAndroid Build Coastguard Worker } 266*61c4878aSAndroid Build Coastguard Worker transfer.handleChunk(chunk); 267*61c4878aSAndroid Build Coastguard Worker } 268*61c4878aSAndroid Build Coastguard Worker} 269*61c4878aSAndroid Build Coastguard Worker 270*61c4878aSAndroid Build Coastguard Worker/** 271*61c4878aSAndroid Build Coastguard Worker * Exception raised when a transfer fails. 272*61c4878aSAndroid Build Coastguard Worker * 273*61c4878aSAndroid Build Coastguard Worker * Stores the ID of the failed transfer and the error that occured. 274*61c4878aSAndroid Build Coastguard Worker */ 275*61c4878aSAndroid Build Coastguard Workerclass TransferError extends Error { 276*61c4878aSAndroid Build Coastguard Worker id: number; 277*61c4878aSAndroid Build Coastguard Worker status: Status; 278*61c4878aSAndroid Build Coastguard Worker 279*61c4878aSAndroid Build Coastguard Worker constructor(id: number, status: Status) { 280*61c4878aSAndroid Build Coastguard Worker super(`Transfer ${id} failed with status ${Status[status]}`); 281*61c4878aSAndroid Build Coastguard Worker this.status = status; 282*61c4878aSAndroid Build Coastguard Worker this.id = id; 283*61c4878aSAndroid Build Coastguard Worker } 284*61c4878aSAndroid Build Coastguard Worker} 285