1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/** Client for the pw_transfer service, which transmits data over pw_rpc. */ 16 17import { 18 BidirectionalStreamingCall, 19 BidirectionalStreamingMethodStub, 20 ServiceClient, 21} from 'pigweedjs/pw_rpc'; 22import { Status } from 'pigweedjs/pw_status'; 23import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb'; 24 25import { 26 ReadTransfer, 27 ProgressCallback, 28 Transfer, 29 WriteTransfer, 30} from './transfer'; 31 32type TransferDict = { 33 [key: number]: Transfer; 34}; 35 36const DEFAULT_MAX_RETRIES = 3; 37const DEFAULT_RESPONSE_TIMEOUT_S = 2; 38const DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4; 39 40/** 41 * A manager for transmitting data through an RPC TransferService. 42 * 43 * This should be initialized with an active Manager over an RPC channel. Only 44 * one instance of this class should exist for a configured RPC TransferService 45 * -- the Manager supports multiple simultaneous transfers. 46 * 47 * When created, a Manager starts a separate thread in which transfer 48 * communications and events are handled. 49 */ 50export class Manager { 51 // Ongoing transfers in the service by ID 52 readTransfers: TransferDict = {}; 53 writeTransfers: TransferDict = {}; 54 55 // RPC streams for read and write transfers. These are shareable by 56 // multiple transfers of the same type. 57 private readStream?: BidirectionalStreamingCall; 58 private writeStream?: BidirectionalStreamingCall; 59 60 /** 61 * Initializes a Manager on top of a TransferService. 62 * 63 * Args: 64 * @param{ServiceClient} service: the pw_rpc transfer service 65 * client 66 * @param{number} defaultResponseTimeoutS: max time to wait between receiving 67 * packets 68 * @param{number} initialResponseTimeoutS: timeout for the first packet; may 69 * be longer to account for transfer handler initialization 70 * @param{number} maxRetries: number of times to retry after a timeout 71 */ 72 constructor( 73 private service: ServiceClient, 74 private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S, 75 private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT, 76 private maxRetries = DEFAULT_MAX_RETRIES, 77 ) {} 78 79 /** 80 * Receives ("downloads") data from the server. 81 * 82 * @throws Throws an error when the transfer fails to complete. 83 */ 84 async read( 85 resourceId: number, 86 progressCallback?: ProgressCallback, 87 ): Promise<Uint8Array> { 88 if (resourceId in this.readTransfers) { 89 throw new Error( 90 `Read transfer for resource ${resourceId} already exists`, 91 ); 92 } 93 const transfer = new ReadTransfer( 94 resourceId, 95 this.sendReadChunkCallback, 96 this.defaultResponseTimeoutS, 97 this.maxRetries, 98 progressCallback, 99 ); 100 101 this.startReadTransfer(transfer); 102 103 const status = await transfer.done; 104 105 delete this.readTransfers[transfer.id]; 106 if (status !== Status.OK) { 107 throw new TransferError(transfer.id, transfer.status); 108 } 109 return transfer.data; 110 } 111 112 /** Begins a new read transfer, opening the stream if it isn't. */ 113 startReadTransfer(transfer: Transfer): void { 114 this.readTransfers[transfer.id] = transfer; 115 116 if (this.readStream === undefined) { 117 this.openReadStream(); 118 } 119 console.debug(`Starting new read transfer ${transfer.id}`); 120 transfer.begin(); 121 } 122 123 /** 124 Transmits (uploads) data to the server. 125 * 126 * @param{number} resourceId: ID of the resource to which to write. 127 * @param{Uint8Array} data: Data to send to the server. 128 */ 129 async write( 130 resourceId: number, 131 data: Uint8Array, 132 progressCallback?: ProgressCallback, 133 ): Promise<void> { 134 const transfer = new WriteTransfer( 135 resourceId, 136 data, 137 this.sendWriteChunkCallback, 138 this.defaultResponseTimeoutS, 139 this.initialResponseTimeoutS, 140 this.maxRetries, 141 progressCallback, 142 ); 143 this.startWriteTransfer(transfer); 144 145 const status = await transfer.done; 146 147 delete this.writeTransfers[transfer.id]; 148 if (transfer.status !== Status.OK) { 149 throw new TransferError(transfer.id, transfer.status); 150 } 151 } 152 153 sendReadChunkCallback = (chunk: Chunk) => { 154 this.readStream!.send(chunk); 155 }; 156 157 sendWriteChunkCallback = (chunk: Chunk) => { 158 this.writeStream!.send(chunk); 159 }; 160 161 /** Begins a new write transfer, opening the stream if it isn't */ 162 startWriteTransfer(transfer: Transfer): void { 163 this.writeTransfers[transfer.id] = transfer; 164 165 if (!this.writeStream) { 166 this.openWriteStream(); 167 } 168 169 console.debug(`Starting new write transfer ${transfer.id}`); 170 transfer.begin(); 171 } 172 173 private openReadStream(): void { 174 const readRpc = this.service.method( 175 'Read', 176 )! as BidirectionalStreamingMethodStub; 177 this.readStream = readRpc.invoke( 178 (chunk: Chunk) => { 179 this.handleChunk(this.readTransfers, chunk); 180 }, 181 () => { 182 // Do nothing. 183 }, 184 this.onReadError, 185 ); 186 } 187 188 private openWriteStream(): void { 189 const writeRpc = this.service.method( 190 'Write', 191 )! as BidirectionalStreamingMethodStub; 192 this.writeStream = writeRpc.invoke( 193 (chunk: Chunk) => { 194 this.handleChunk(this.writeTransfers, chunk); 195 }, 196 () => { 197 // Do nothing. 198 }, 199 this.onWriteError, 200 ); 201 } 202 203 /** 204 * Callback for an RPC error in the read stream. 205 */ 206 private onReadError = (status: Status) => { 207 if (status === Status.FAILED_PRECONDITION) { 208 // FAILED_PRECONDITION indicates that the stream packet was not 209 // recognized as the stream is not open. This could occur if the 210 // server resets during an active transfer. Re-open the stream to 211 // allow pending transfers to continue. 212 this.openReadStream(); 213 return; 214 } 215 216 // Other errors are unrecoverable. Clear the stream and cancel any 217 // pending transfers with an INTERNAL status as this is a system 218 // error. 219 this.readStream = undefined; 220 221 for (const key in this.readTransfers) { 222 const transfer = this.readTransfers[key]; 223 transfer.abort(Status.INTERNAL); 224 } 225 this.readTransfers = {}; 226 console.error(`Read stream shut down ${Status[status]}`); 227 }; 228 229 private onWriteError = (status: Status) => { 230 if (status === Status.FAILED_PRECONDITION) { 231 // FAILED_PRECONDITION indicates that the stream packet was not 232 // recognized as the stream is not open. This could occur if the 233 // server resets during an active transfer. Re-open the stream to 234 // allow pending transfers to continue. 235 this.openWriteStream(); 236 } else { 237 // Other errors are unrecoverable. Clear the stream and cancel any 238 // pending transfers with an INTERNAL status as this is a system 239 // error. 240 this.writeStream = undefined; 241 242 for (const key in this.writeTransfers) { 243 const transfer = this.writeTransfers[key]; 244 transfer.abort(Status.INTERNAL); 245 } 246 this.writeTransfers = {}; 247 console.error(`Write stream shut down: ${Status[status]}`); 248 } 249 }; 250 251 /** 252 * Processes an incoming chunk from a stream. 253 * 254 * The chunk is dispatched to an active transfer based on its ID. If the 255 * transfer indicates that it is complete, the provided completion callback 256 * is invoked. 257 */ 258 private async handleChunk(transfers: TransferDict, chunk: Chunk) { 259 const transfer = transfers[chunk.getTransferId()]; 260 if (transfer === undefined) { 261 console.error( 262 `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}`, 263 ); 264 return; 265 } 266 transfer.handleChunk(chunk); 267 } 268} 269 270/** 271 * Exception raised when a transfer fails. 272 * 273 * Stores the ID of the failed transfer and the error that occured. 274 */ 275class TransferError extends Error { 276 id: number; 277 status: Status; 278 279 constructor(id: number, status: Status) { 280 super(`Transfer ${id} failed with status ${Status[status]}`); 281 this.status = status; 282 this.id = id; 283 } 284} 285