1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { Status } from 'pigweedjs/pw_status'; 16import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb'; 17 18export class ProgressStats { 19 constructor( 20 readonly bytesSent: number, 21 readonly bytesConfirmedReceived: number, 22 readonly totalSizeBytes?: number, 23 ) {} 24 25 get percentReceived(): number { 26 if (this.totalSizeBytes === undefined) { 27 return NaN; 28 } 29 return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100; 30 } 31 32 toString(): string { 33 const total = 34 this.totalSizeBytes === undefined 35 ? 'undefined' 36 : this.totalSizeBytes.toString(); 37 const percent = this.percentReceived.toFixed(2); 38 return ( 39 `${percent}% (${this.bytesSent} B sent, ` + 40 `${this.bytesConfirmedReceived} B received of ${total} B)` 41 ); 42 } 43} 44 45export type ProgressCallback = (stats: ProgressStats) => void; 46 47/** A Timer which invokes a callback after a certain timeout. */ 48class Timer { 49 private task?: ReturnType<typeof setTimeout>; 50 51 constructor( 52 readonly timeoutS: number, 53 private readonly callback: () => any, 54 ) {} 55 56 /** 57 * Starts a new timer. 58 * 59 * If a timer is already running, it is stopped and a new timer started. 60 * This can be used to implement watchdog-like behavior, where a callback 61 * is invoked after some time without a kick. 62 */ 63 start() { 64 this.stop(); 65 this.task = setTimeout(this.callback, this.timeoutS * 1000); 66 } 67 68 /** Terminates a running timer. */ 69 stop() { 70 if (this.task !== undefined) { 71 clearTimeout(this.task); 72 this.task = undefined; 73 } 74 } 75} 76 77/** 78 * A client-side data transfer through a Manager. 79 * 80 * Subclasses are responsible for implementing all of the logic for their type 81 * of transfer, receiving messages from the server and sending the appropriate 82 * messages in response. 83 */ 84export abstract class Transfer { 85 status: Status = Status.OK; 86 done: Promise<Status>; 87 data = new Uint8Array(0); 88 89 private retries = 0; 90 private responseTimer?: Timer; 91 private resolve?: (value: Status | PromiseLike<Status>) => void; 92 93 constructor( 94 public id: number, 95 protected sendChunk: (chunk: Chunk) => void, 96 responseTimeoutS: number, 97 private maxRetries: number, 98 private progressCallback?: ProgressCallback, 99 ) { 100 this.responseTimer = new Timer(responseTimeoutS, this.onTimeout); 101 this.done = new Promise<Status>((resolve) => { 102 this.resolve = resolve!; 103 }); 104 } 105 106 /** Returns the initial chunk to notify the server of the transfer. */ 107 protected abstract get initialChunk(): Chunk; 108 109 /** Handles a chunk that contains or requests data. */ 110 protected abstract handleDataChunk(chunk: Chunk): void; 111 112 /** Retries after a timeout occurs. */ 113 protected abstract retryAfterTimeout(): void; 114 115 /** Handles a timeout while waiting for a chunk. */ 116 private onTimeout = () => { 117 this.retries += 1; 118 if (this.retries > this.maxRetries) { 119 this.finish(Status.DEADLINE_EXCEEDED); 120 return; 121 } 122 123 console.debug( 124 `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`, 125 ); 126 127 this.retryAfterTimeout(); 128 this.responseTimer?.start(); 129 }; 130 131 /** Sends the initial chunk of the transfer. */ 132 begin(): void { 133 this.sendChunk(this.initialChunk as any); 134 this.responseTimer?.start(); 135 } 136 137 /** Ends the transfer with the specified status */ 138 protected finish(status: Status): void { 139 this.responseTimer?.stop(); 140 this.responseTimer = undefined; 141 this.status = status; 142 143 if (status === Status.OK) { 144 const totalSize = this.data.length; 145 this.updateProgress(totalSize, totalSize, totalSize); 146 } 147 148 this.resolve!(this.status); 149 } 150 151 /** Ends the transfer without sending a completion chunk */ 152 abort(status: Status): void { 153 this.finish(status); 154 } 155 156 /** Ends the transfer and sends a completion chunk */ 157 terminate(status: Status): void { 158 const chunk = new Chunk(); 159 chunk.setStatus(status); 160 chunk.setTransferId(this.id); 161 chunk.setType(Chunk.Type.COMPLETION); 162 this.sendChunk(chunk); 163 this.abort(status); 164 } 165 166 /** Invokes the provided progress callback, if any, with the progress */ 167 updateProgress( 168 bytesSent: number, 169 bytesConfirmedReceived: number, 170 totalSizeBytes?: number, 171 ): void { 172 const stats = new ProgressStats( 173 bytesSent, 174 bytesConfirmedReceived, 175 totalSizeBytes, 176 ); 177 console.debug(`Transfer ${this.id} progress: ${stats}`); 178 179 if (this.progressCallback !== undefined) { 180 this.progressCallback(stats); 181 } 182 } 183 184 /** 185 * Processes an incoming chunk from the server. 186 * 187 * Handles terminating chunks (i.e. those with a status) and forwards 188 * non-terminating chunks to handle_data_chunk. 189 */ 190 handleChunk(chunk: Chunk): void { 191 this.responseTimer?.stop(); 192 this.retries = 0; // Received data from service, so reset the retries. 193 194 console.debug(`Received chunk:(${chunk})`); 195 196 // Status chunks are only used to terminate a transfer. They do not 197 // contain any data that requires processing. 198 if (chunk.hasStatus()) { 199 this.finish(chunk.getStatus()); 200 return; 201 } 202 203 this.handleDataChunk(chunk); 204 205 // Start the timeout for the server to send a chunk in response. 206 this.responseTimer?.start(); 207 } 208} 209 210/** 211 * A client <= server read transfer. 212 * 213 * Although typescript can effectively handle an unlimited transfer window, this 214 * client sets a conservative window and chunk size to avoid overloading the 215 * device. These are configurable in the constructor. 216 */ 217export class ReadTransfer extends Transfer { 218 private maxBytesToReceive: number; 219 private maxChunkSize: number; 220 private chunkDelayMicroS?: number; // Microseconds 221 private remainingTransferSize?: number; 222 private offset = 0; 223 private pendingBytes: number; 224 private windowEndOffset: number; 225 226 // The fractional position within a window at which a receive transfer should 227 // extend its window size to minimize the amount of time the transmitter 228 // spends blocked. 229 // 230 // For example, a divisor of 2 will extend the window when half of the 231 // requested data has been received, a divisor of three will extend at a third 232 // of the window, and so on. 233 private static EXTEND_WINDOW_DIVISOR = 2; 234 235 constructor( 236 id: number, 237 sendChunk: (chunk: Chunk) => void, 238 responseTimeoutS: number, 239 maxRetries: number, 240 progressCallback?: ProgressCallback, 241 maxBytesToReceive = 8192, 242 maxChunkSize = 1024, 243 chunkDelayMicroS?: number, 244 ) { 245 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 246 this.maxBytesToReceive = maxBytesToReceive; 247 this.maxChunkSize = maxChunkSize; 248 this.chunkDelayMicroS = chunkDelayMicroS; 249 this.pendingBytes = maxBytesToReceive; 250 this.windowEndOffset = maxBytesToReceive; 251 } 252 253 protected get initialChunk(): any { 254 return this.transferParameters(Chunk.Type.START); 255 } 256 257 /** Builds an updated transfer parameters chunk to send the server. */ 258 private transferParameters(type: any, update = true): Chunk { 259 if (update) { 260 this.pendingBytes = this.maxBytesToReceive; 261 this.windowEndOffset = this.offset + this.maxBytesToReceive; 262 } 263 264 const chunk = new Chunk(); 265 chunk.setTransferId(this.id); 266 chunk.setPendingBytes(this.pendingBytes); 267 chunk.setMaxChunkSizeBytes(this.maxChunkSize); 268 chunk.setOffset(this.offset); 269 chunk.setWindowEndOffset(this.windowEndOffset); 270 chunk.setType(type); 271 272 if (this.chunkDelayMicroS !== 0) { 273 chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!); 274 } 275 return chunk; 276 } 277 278 /** 279 * Processes an incoming chunk from the server. 280 * 281 * In a read transfer, the client receives data chunks from the server. 282 * Once all pending data is received, the transfer parameters are updated. 283 */ 284 protected handleDataChunk(chunk: Chunk): void { 285 const chunkData = chunk.getData() as Uint8Array; 286 287 if (chunk.getOffset() != this.offset) { 288 if (chunk.getOffset() + chunkData.length <= this.offset) { 289 // If the chunk's data has already been received, don't go through a full 290 // recovery cycle to avoid shrinking the window size and potentially 291 // thrashing. The expected data may already be in-flight, so just allow 292 // the transmitter to keep going with a CONTINUE parameters chunk. 293 this.sendChunk( 294 this.transferParameters( 295 Chunk.Type.PARAMETERS_CONTINUE, 296 /*update=*/ false, 297 ), 298 ); 299 } else { 300 // Initially, the transfer service only supports in-order transfers. 301 // If data is received out of order, request that the server 302 // retransmit from the previous offset. 303 this.sendChunk( 304 this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT), 305 ); 306 } 307 return; 308 } 309 310 const oldData = this.data; 311 this.data = new Uint8Array(chunkData.length + oldData.length); 312 this.data.set(oldData); 313 this.data.set(chunkData, oldData.length); 314 315 this.pendingBytes -= chunk.getData().length; 316 this.offset += chunk.getData().length; 317 318 if (chunk.hasRemainingBytes()) { 319 if (chunk.getRemainingBytes() === 0) { 320 // No more data to read. Acknowledge receipt and finish. 321 this.terminate(Status.OK); 322 return; 323 } 324 325 this.remainingTransferSize = chunk.getRemainingBytes(); 326 } else if (this.remainingTransferSize !== undefined) { 327 // Update the remaining transfer size, if it is known. 328 this.remainingTransferSize -= chunk.getData().length; 329 330 if (this.remainingTransferSize <= 0) { 331 this.remainingTransferSize = undefined; 332 } 333 } 334 335 if (chunk.getWindowEndOffset() !== 0) { 336 if (chunk.getWindowEndOffset() < this.offset) { 337 console.error( 338 `Transfer ${ 339 this.id 340 }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${ 341 this.offset 342 })`, 343 ); 344 this.terminate(Status.INTERNAL); 345 return; 346 } 347 348 if (chunk.getWindowEndOffset() < this.offset) { 349 console.error( 350 `Transfer ${ 351 this.id 352 }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${ 353 this.windowEndOffset 354 })`, 355 ); 356 this.terminate(Status.INTERNAL); 357 return; 358 } 359 360 this.windowEndOffset = chunk.getWindowEndOffset(); 361 this.pendingBytes -= chunk.getWindowEndOffset() - this.offset; 362 } 363 364 const remainingWindowSize = this.windowEndOffset - this.offset; 365 const extendWindow = 366 remainingWindowSize <= 367 this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR; 368 369 const totalSize = 370 this.remainingTransferSize === undefined 371 ? undefined 372 : this.remainingTransferSize + this.offset; 373 this.updateProgress(this.offset, this.offset, totalSize); 374 375 if (this.pendingBytes === 0) { 376 // All pending data was received. Send out a new parameters chunk 377 // for the next block. 378 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 379 } else if (extendWindow) { 380 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE)); 381 } 382 } 383 384 protected retryAfterTimeout(): void { 385 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 386 } 387} 388 389/** 390 * A client => server write transfer. 391 */ 392export class WriteTransfer extends Transfer { 393 private windowId = 0; 394 offset = 0; 395 maxChunkSize = 0; 396 chunkDelayMicroS?: number; 397 windowEndOffset = 0; 398 lastChunk: Chunk; 399 400 constructor( 401 id: number, 402 data: Uint8Array, 403 sendChunk: (chunk: Chunk) => void, 404 responseTimeoutS: number, 405 initialResponseTimeoutS: number, 406 maxRetries: number, 407 progressCallback?: ProgressCallback, 408 ) { 409 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 410 this.data = data; 411 this.lastChunk = this.initialChunk; 412 } 413 414 protected get initialChunk(): any { 415 // TODO(frolv): The session ID should not be set here but assigned by the 416 // server during an initial handshake. 417 const chunk = new Chunk(); 418 chunk.setTransferId(this.id); 419 chunk.setResourceId(this.id); 420 chunk.setType(Chunk.Type.START); 421 return chunk; 422 } 423 424 /** 425 * Processes an incoming chunk from the server. 426 * 427 * In a write transfer, the server only sends transfer parameter updates 428 * to the client. When a message is received, update local parameters and 429 * send data accordingly. 430 */ 431 protected handleDataChunk(chunk: Chunk): void { 432 this.windowId += 1; 433 const initialWindowId = this.windowId; 434 435 if (!this.handleParametersUpdate(chunk)) { 436 return; 437 } 438 439 const bytesAknowledged = chunk.getOffset(); 440 441 let writeChunk: Chunk; 442 // eslint-disable-next-line no-constant-condition 443 while (true) { 444 writeChunk = this.nextChunk(); 445 this.offset += writeChunk.getData().length; 446 const sentRequestedBytes = this.offset === this.windowEndOffset; 447 448 this.updateProgress(this.offset, bytesAknowledged, this.data.length); 449 this.sendChunk(writeChunk); 450 451 if (sentRequestedBytes) { 452 break; 453 } 454 } 455 456 this.lastChunk = writeChunk; 457 } 458 459 /** Updates transfer state base on a transfer parameters update. */ 460 private handleParametersUpdate(chunk: Chunk): boolean { 461 let retransmit = true; 462 if (chunk.hasType()) { 463 retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT; 464 } 465 466 if (chunk.getOffset() > this.data.length) { 467 // Bad offset; terminate the transfer. 468 console.error( 469 `Transfer ${ 470 this.id 471 }: server requested invalid offset ${chunk.getOffset()} (size ${ 472 this.data.length 473 })`, 474 ); 475 476 this.terminate(Status.OUT_OF_RANGE); 477 return false; 478 } 479 480 if (chunk.getPendingBytes() === 0) { 481 console.error( 482 `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`, 483 ); 484 this.terminate(Status.INTERNAL); 485 return false; 486 } 487 488 if (retransmit) { 489 // Check whether the client has sent a previous data offset, which 490 // indicates that some chunks were lost in transmission. 491 if (chunk.getOffset() < this.offset) { 492 console.debug( 493 `Write transfer ${ 494 this.id 495 } rolling back to offset ${chunk.getOffset()} from ${this.offset}`, 496 ); 497 } 498 499 this.offset = chunk.getOffset(); 500 501 // Retransmit is the default behavior for older versions of the 502 // transfer protocol. The window_end_offset field is not guaranteed 503 // to be set in these version, so it must be calculated. 504 const maxBytesToSend = Math.min( 505 chunk.getPendingBytes(), 506 this.data.length - this.offset, 507 ); 508 this.windowEndOffset = this.offset + maxBytesToSend; 509 } else { 510 // Extend the window to the new end offset specified by the server. 511 this.windowEndOffset = Math.min( 512 chunk.getWindowEndOffset(), 513 this.data.length, 514 ); 515 } 516 517 if (chunk.hasMaxChunkSizeBytes()) { 518 this.maxChunkSize = chunk.getMaxChunkSizeBytes(); 519 } 520 521 if (chunk.hasMinDelayMicroseconds()) { 522 this.chunkDelayMicroS = chunk.getMinDelayMicroseconds(); 523 } 524 return true; 525 } 526 527 /** Returns the next Chunk message to send in the data transfer. */ 528 private nextChunk(): Chunk { 529 const chunk = new Chunk(); 530 chunk.setTransferId(this.id); 531 chunk.setOffset(this.offset); 532 chunk.setType(Chunk.Type.DATA); 533 534 const maxBytesInChunk = Math.min( 535 this.maxChunkSize, 536 this.windowEndOffset - this.offset, 537 ); 538 539 chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk)); 540 541 // Mark the final chunk of the transfer. 542 if (this.data.length - this.offset <= maxBytesInChunk) { 543 chunk.setRemainingBytes(0); 544 } 545 return chunk; 546 } 547 548 protected retryAfterTimeout(): void { 549 this.sendChunk(this.lastChunk); 550 } 551} 552