xref: /aosp_15_r20/external/pigweed/pw_transfer/ts/transfer.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import { Status } from 'pigweedjs/pw_status';
16import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb';
17
18export class ProgressStats {
19  constructor(
20    readonly bytesSent: number,
21    readonly bytesConfirmedReceived: number,
22    readonly totalSizeBytes?: number,
23  ) {}
24
25  get percentReceived(): number {
26    if (this.totalSizeBytes === undefined) {
27      return NaN;
28    }
29    return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100;
30  }
31
32  toString(): string {
33    const total =
34      this.totalSizeBytes === undefined
35        ? 'undefined'
36        : this.totalSizeBytes.toString();
37    const percent = this.percentReceived.toFixed(2);
38    return (
39      `${percent}% (${this.bytesSent} B sent, ` +
40      `${this.bytesConfirmedReceived} B received of ${total} B)`
41    );
42  }
43}
44
45export type ProgressCallback = (stats: ProgressStats) => void;
46
47/** A Timer which invokes a callback after a certain timeout. */
48class Timer {
49  private task?: ReturnType<typeof setTimeout>;
50
51  constructor(
52    readonly timeoutS: number,
53    private readonly callback: () => any,
54  ) {}
55
56  /**
57   * Starts a new timer.
58   *
59   * If a timer is already running, it is stopped and a new timer started.
60   * This can be used to implement watchdog-like behavior, where a callback
61   * is invoked after some time without a kick.
62   */
63  start() {
64    this.stop();
65    this.task = setTimeout(this.callback, this.timeoutS * 1000);
66  }
67
68  /** Terminates a running timer. */
69  stop() {
70    if (this.task !== undefined) {
71      clearTimeout(this.task);
72      this.task = undefined;
73    }
74  }
75}
76
77/**
78 * A client-side data transfer through a Manager.
79 *
80 * Subclasses are responsible for implementing all of the logic for their type
81 * of transfer, receiving messages from the server and sending the appropriate
82 * messages in response.
83 */
84export abstract class Transfer {
85  status: Status = Status.OK;
86  done: Promise<Status>;
87  data = new Uint8Array(0);
88
89  private retries = 0;
90  private responseTimer?: Timer;
91  private resolve?: (value: Status | PromiseLike<Status>) => void;
92
93  constructor(
94    public id: number,
95    protected sendChunk: (chunk: Chunk) => void,
96    responseTimeoutS: number,
97    private maxRetries: number,
98    private progressCallback?: ProgressCallback,
99  ) {
100    this.responseTimer = new Timer(responseTimeoutS, this.onTimeout);
101    this.done = new Promise<Status>((resolve) => {
102      this.resolve = resolve!;
103    });
104  }
105
106  /** Returns the initial chunk to notify the server of the transfer. */
107  protected abstract get initialChunk(): Chunk;
108
109  /** Handles a chunk that contains or requests data. */
110  protected abstract handleDataChunk(chunk: Chunk): void;
111
112  /** Retries after a timeout occurs. */
113  protected abstract retryAfterTimeout(): void;
114
115  /** Handles a timeout while waiting for a chunk. */
116  private onTimeout = () => {
117    this.retries += 1;
118    if (this.retries > this.maxRetries) {
119      this.finish(Status.DEADLINE_EXCEEDED);
120      return;
121    }
122
123    console.debug(
124      `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`,
125    );
126
127    this.retryAfterTimeout();
128    this.responseTimer?.start();
129  };
130
131  /** Sends the initial chunk of the transfer. */
132  begin(): void {
133    this.sendChunk(this.initialChunk as any);
134    this.responseTimer?.start();
135  }
136
137  /** Ends the transfer with the specified status */
138  protected finish(status: Status): void {
139    this.responseTimer?.stop();
140    this.responseTimer = undefined;
141    this.status = status;
142
143    if (status === Status.OK) {
144      const totalSize = this.data.length;
145      this.updateProgress(totalSize, totalSize, totalSize);
146    }
147
148    this.resolve!(this.status);
149  }
150
151  /** Ends the transfer without sending a completion chunk */
152  abort(status: Status): void {
153    this.finish(status);
154  }
155
156  /** Ends the transfer and sends a completion chunk */
157  terminate(status: Status): void {
158    const chunk = new Chunk();
159    chunk.setStatus(status);
160    chunk.setTransferId(this.id);
161    chunk.setType(Chunk.Type.COMPLETION);
162    this.sendChunk(chunk);
163    this.abort(status);
164  }
165
166  /** Invokes the provided progress callback, if any, with the progress */
167  updateProgress(
168    bytesSent: number,
169    bytesConfirmedReceived: number,
170    totalSizeBytes?: number,
171  ): void {
172    const stats = new ProgressStats(
173      bytesSent,
174      bytesConfirmedReceived,
175      totalSizeBytes,
176    );
177    console.debug(`Transfer ${this.id} progress: ${stats}`);
178
179    if (this.progressCallback !== undefined) {
180      this.progressCallback(stats);
181    }
182  }
183
184  /**
185   *  Processes an incoming chunk from the server.
186   *
187   *  Handles terminating chunks (i.e. those with a status) and forwards
188   *  non-terminating chunks to handle_data_chunk.
189   */
190  handleChunk(chunk: Chunk): void {
191    this.responseTimer?.stop();
192    this.retries = 0; // Received data from service, so reset the retries.
193
194    console.debug(`Received chunk:(${chunk})`);
195
196    // Status chunks are only used to terminate a transfer. They do not
197    // contain any data that requires processing.
198    if (chunk.hasStatus()) {
199      this.finish(chunk.getStatus());
200      return;
201    }
202
203    this.handleDataChunk(chunk);
204
205    // Start the timeout for the server to send a chunk in response.
206    this.responseTimer?.start();
207  }
208}
209
210/**
211 * A client <= server read transfer.
212 *
213 * Although typescript can effectively handle an unlimited transfer window, this
214 * client sets a conservative window and chunk size to avoid overloading the
215 * device. These are configurable in the constructor.
216 */
217export class ReadTransfer extends Transfer {
218  private maxBytesToReceive: number;
219  private maxChunkSize: number;
220  private chunkDelayMicroS?: number; // Microseconds
221  private remainingTransferSize?: number;
222  private offset = 0;
223  private pendingBytes: number;
224  private windowEndOffset: number;
225
226  // The fractional position within a window at which a receive transfer should
227  // extend its window size to minimize the amount of time the transmitter
228  // spends blocked.
229  //
230  // For example, a divisor of 2 will extend the window when half of the
231  // requested data has been received, a divisor of three will extend at a third
232  // of the window, and so on.
233  private static EXTEND_WINDOW_DIVISOR = 2;
234
235  constructor(
236    id: number,
237    sendChunk: (chunk: Chunk) => void,
238    responseTimeoutS: number,
239    maxRetries: number,
240    progressCallback?: ProgressCallback,
241    maxBytesToReceive = 8192,
242    maxChunkSize = 1024,
243    chunkDelayMicroS?: number,
244  ) {
245    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
246    this.maxBytesToReceive = maxBytesToReceive;
247    this.maxChunkSize = maxChunkSize;
248    this.chunkDelayMicroS = chunkDelayMicroS;
249    this.pendingBytes = maxBytesToReceive;
250    this.windowEndOffset = maxBytesToReceive;
251  }
252
253  protected get initialChunk(): any {
254    return this.transferParameters(Chunk.Type.START);
255  }
256
257  /** Builds an updated transfer parameters chunk to send the server. */
258  private transferParameters(type: any, update = true): Chunk {
259    if (update) {
260      this.pendingBytes = this.maxBytesToReceive;
261      this.windowEndOffset = this.offset + this.maxBytesToReceive;
262    }
263
264    const chunk = new Chunk();
265    chunk.setTransferId(this.id);
266    chunk.setPendingBytes(this.pendingBytes);
267    chunk.setMaxChunkSizeBytes(this.maxChunkSize);
268    chunk.setOffset(this.offset);
269    chunk.setWindowEndOffset(this.windowEndOffset);
270    chunk.setType(type);
271
272    if (this.chunkDelayMicroS !== 0) {
273      chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
274    }
275    return chunk;
276  }
277
278  /**
279   * Processes an incoming chunk from the server.
280   *
281   * In a read transfer, the client receives data chunks from the server.
282   * Once all pending data is received, the transfer parameters are updated.
283   */
284  protected handleDataChunk(chunk: Chunk): void {
285    const chunkData = chunk.getData() as Uint8Array;
286
287    if (chunk.getOffset() != this.offset) {
288      if (chunk.getOffset() + chunkData.length <= this.offset) {
289        // If the chunk's data has already been received, don't go through a full
290        // recovery cycle to avoid shrinking the window size and potentially
291        // thrashing. The expected data may already be in-flight, so just allow
292        // the transmitter to keep going with a CONTINUE parameters chunk.
293        this.sendChunk(
294          this.transferParameters(
295            Chunk.Type.PARAMETERS_CONTINUE,
296            /*update=*/ false,
297          ),
298        );
299      } else {
300        // Initially, the transfer service only supports in-order transfers.
301        // If data is received out of order, request that the server
302        // retransmit from the previous offset.
303        this.sendChunk(
304          this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT),
305        );
306      }
307      return;
308    }
309
310    const oldData = this.data;
311    this.data = new Uint8Array(chunkData.length + oldData.length);
312    this.data.set(oldData);
313    this.data.set(chunkData, oldData.length);
314
315    this.pendingBytes -= chunk.getData().length;
316    this.offset += chunk.getData().length;
317
318    if (chunk.hasRemainingBytes()) {
319      if (chunk.getRemainingBytes() === 0) {
320        // No more data to read. Acknowledge receipt and finish.
321        this.terminate(Status.OK);
322        return;
323      }
324
325      this.remainingTransferSize = chunk.getRemainingBytes();
326    } else if (this.remainingTransferSize !== undefined) {
327      // Update the remaining transfer size, if it is known.
328      this.remainingTransferSize -= chunk.getData().length;
329
330      if (this.remainingTransferSize <= 0) {
331        this.remainingTransferSize = undefined;
332      }
333    }
334
335    if (chunk.getWindowEndOffset() !== 0) {
336      if (chunk.getWindowEndOffset() < this.offset) {
337        console.error(
338          `Transfer ${
339            this.id
340          }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${
341            this.offset
342          })`,
343        );
344        this.terminate(Status.INTERNAL);
345        return;
346      }
347
348      if (chunk.getWindowEndOffset() < this.offset) {
349        console.error(
350          `Transfer ${
351            this.id
352          }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${
353            this.windowEndOffset
354          })`,
355        );
356        this.terminate(Status.INTERNAL);
357        return;
358      }
359
360      this.windowEndOffset = chunk.getWindowEndOffset();
361      this.pendingBytes -= chunk.getWindowEndOffset() - this.offset;
362    }
363
364    const remainingWindowSize = this.windowEndOffset - this.offset;
365    const extendWindow =
366      remainingWindowSize <=
367      this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR;
368
369    const totalSize =
370      this.remainingTransferSize === undefined
371        ? undefined
372        : this.remainingTransferSize + this.offset;
373    this.updateProgress(this.offset, this.offset, totalSize);
374
375    if (this.pendingBytes === 0) {
376      // All pending data was received. Send out a new parameters chunk
377      // for the next block.
378      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
379    } else if (extendWindow) {
380      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE));
381    }
382  }
383
384  protected retryAfterTimeout(): void {
385    this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
386  }
387}
388
389/**
390 * A client => server write transfer.
391 */
392export class WriteTransfer extends Transfer {
393  private windowId = 0;
394  offset = 0;
395  maxChunkSize = 0;
396  chunkDelayMicroS?: number;
397  windowEndOffset = 0;
398  lastChunk: Chunk;
399
400  constructor(
401    id: number,
402    data: Uint8Array,
403    sendChunk: (chunk: Chunk) => void,
404    responseTimeoutS: number,
405    initialResponseTimeoutS: number,
406    maxRetries: number,
407    progressCallback?: ProgressCallback,
408  ) {
409    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
410    this.data = data;
411    this.lastChunk = this.initialChunk;
412  }
413
414  protected get initialChunk(): any {
415    // TODO(frolv): The session ID should not be set here but assigned by the
416    // server during an initial handshake.
417    const chunk = new Chunk();
418    chunk.setTransferId(this.id);
419    chunk.setResourceId(this.id);
420    chunk.setType(Chunk.Type.START);
421    return chunk;
422  }
423
424  /**
425   * Processes an incoming chunk from the server.
426   *
427   * In a write transfer, the server only sends transfer parameter updates
428   * to the client. When a message is received, update local parameters and
429   * send data accordingly.
430   */
431  protected handleDataChunk(chunk: Chunk): void {
432    this.windowId += 1;
433    const initialWindowId = this.windowId;
434
435    if (!this.handleParametersUpdate(chunk)) {
436      return;
437    }
438
439    const bytesAknowledged = chunk.getOffset();
440
441    let writeChunk: Chunk;
442    // eslint-disable-next-line no-constant-condition
443    while (true) {
444      writeChunk = this.nextChunk();
445      this.offset += writeChunk.getData().length;
446      const sentRequestedBytes = this.offset === this.windowEndOffset;
447
448      this.updateProgress(this.offset, bytesAknowledged, this.data.length);
449      this.sendChunk(writeChunk);
450
451      if (sentRequestedBytes) {
452        break;
453      }
454    }
455
456    this.lastChunk = writeChunk;
457  }
458
459  /** Updates transfer state base on a transfer parameters update. */
460  private handleParametersUpdate(chunk: Chunk): boolean {
461    let retransmit = true;
462    if (chunk.hasType()) {
463      retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT;
464    }
465
466    if (chunk.getOffset() > this.data.length) {
467      // Bad offset; terminate the transfer.
468      console.error(
469        `Transfer ${
470          this.id
471        }: server requested invalid offset ${chunk.getOffset()} (size ${
472          this.data.length
473        })`,
474      );
475
476      this.terminate(Status.OUT_OF_RANGE);
477      return false;
478    }
479
480    if (chunk.getPendingBytes() === 0) {
481      console.error(
482        `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`,
483      );
484      this.terminate(Status.INTERNAL);
485      return false;
486    }
487
488    if (retransmit) {
489      // Check whether the client has sent a previous data offset, which
490      // indicates that some chunks were lost in transmission.
491      if (chunk.getOffset() < this.offset) {
492        console.debug(
493          `Write transfer ${
494            this.id
495          } rolling back to offset ${chunk.getOffset()} from ${this.offset}`,
496        );
497      }
498
499      this.offset = chunk.getOffset();
500
501      // Retransmit is the default behavior for older versions of the
502      // transfer protocol. The window_end_offset field is not guaranteed
503      // to be set in these version, so it must be calculated.
504      const maxBytesToSend = Math.min(
505        chunk.getPendingBytes(),
506        this.data.length - this.offset,
507      );
508      this.windowEndOffset = this.offset + maxBytesToSend;
509    } else {
510      // Extend the window to the new end offset specified by the server.
511      this.windowEndOffset = Math.min(
512        chunk.getWindowEndOffset(),
513        this.data.length,
514      );
515    }
516
517    if (chunk.hasMaxChunkSizeBytes()) {
518      this.maxChunkSize = chunk.getMaxChunkSizeBytes();
519    }
520
521    if (chunk.hasMinDelayMicroseconds()) {
522      this.chunkDelayMicroS = chunk.getMinDelayMicroseconds();
523    }
524    return true;
525  }
526
527  /** Returns the next Chunk message to send in the data transfer. */
528  private nextChunk(): Chunk {
529    const chunk = new Chunk();
530    chunk.setTransferId(this.id);
531    chunk.setOffset(this.offset);
532    chunk.setType(Chunk.Type.DATA);
533
534    const maxBytesInChunk = Math.min(
535      this.maxChunkSize,
536      this.windowEndOffset - this.offset,
537    );
538
539    chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
540
541    // Mark the final chunk of the transfer.
542    if (this.data.length - this.offset <= maxBytesInChunk) {
543      chunk.setRemainingBytes(0);
544    }
545    return chunk;
546  }
547
548  protected retryAfterTimeout(): void {
549    this.sendChunk(this.lastChunk);
550  }
551}
552