xref: /aosp_15_r20/external/pigweed/pw_transfer/ts/client.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker// Copyright 2022 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker//
3*61c4878aSAndroid Build Coastguard Worker// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker// use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker// the License at
6*61c4878aSAndroid Build Coastguard Worker//
7*61c4878aSAndroid Build Coastguard Worker//     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker//
9*61c4878aSAndroid Build Coastguard Worker// Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker// License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker// the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker/** Client for the pw_transfer service, which transmits data over pw_rpc. */
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Workerimport {
18*61c4878aSAndroid Build Coastguard Worker  BidirectionalStreamingCall,
19*61c4878aSAndroid Build Coastguard Worker  BidirectionalStreamingMethodStub,
20*61c4878aSAndroid Build Coastguard Worker  ServiceClient,
21*61c4878aSAndroid Build Coastguard Worker} from 'pigweedjs/pw_rpc';
22*61c4878aSAndroid Build Coastguard Workerimport { Status } from 'pigweedjs/pw_status';
23*61c4878aSAndroid Build Coastguard Workerimport { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb';
24*61c4878aSAndroid Build Coastguard Worker
25*61c4878aSAndroid Build Coastguard Workerimport {
26*61c4878aSAndroid Build Coastguard Worker  ReadTransfer,
27*61c4878aSAndroid Build Coastguard Worker  ProgressCallback,
28*61c4878aSAndroid Build Coastguard Worker  Transfer,
29*61c4878aSAndroid Build Coastguard Worker  WriteTransfer,
30*61c4878aSAndroid Build Coastguard Worker} from './transfer';
31*61c4878aSAndroid Build Coastguard Worker
32*61c4878aSAndroid Build Coastguard Workertype TransferDict = {
33*61c4878aSAndroid Build Coastguard Worker  [key: number]: Transfer;
34*61c4878aSAndroid Build Coastguard Worker};
35*61c4878aSAndroid Build Coastguard Worker
36*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_MAX_RETRIES = 3;
37*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_RESPONSE_TIMEOUT_S = 2;
38*61c4878aSAndroid Build Coastguard Workerconst DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4;
39*61c4878aSAndroid Build Coastguard Worker
40*61c4878aSAndroid Build Coastguard Worker/**
41*61c4878aSAndroid Build Coastguard Worker *  A manager for transmitting data through an RPC TransferService.
42*61c4878aSAndroid Build Coastguard Worker *
43*61c4878aSAndroid Build Coastguard Worker *  This should be initialized with an active Manager over an RPC channel. Only
44*61c4878aSAndroid Build Coastguard Worker *  one instance of this class should exist for a configured RPC TransferService
45*61c4878aSAndroid Build Coastguard Worker *  -- the Manager supports multiple simultaneous transfers.
46*61c4878aSAndroid Build Coastguard Worker *
47*61c4878aSAndroid Build Coastguard Worker *  When created, a Manager starts a separate thread in which transfer
48*61c4878aSAndroid Build Coastguard Worker *  communications and events are handled.
49*61c4878aSAndroid Build Coastguard Worker */
50*61c4878aSAndroid Build Coastguard Workerexport class Manager {
51*61c4878aSAndroid Build Coastguard Worker  // Ongoing transfers in the service by ID
52*61c4878aSAndroid Build Coastguard Worker  readTransfers: TransferDict = {};
53*61c4878aSAndroid Build Coastguard Worker  writeTransfers: TransferDict = {};
54*61c4878aSAndroid Build Coastguard Worker
55*61c4878aSAndroid Build Coastguard Worker  // RPC streams for read and write transfers. These are shareable by
56*61c4878aSAndroid Build Coastguard Worker  // multiple transfers of the same type.
57*61c4878aSAndroid Build Coastguard Worker  private readStream?: BidirectionalStreamingCall;
58*61c4878aSAndroid Build Coastguard Worker  private writeStream?: BidirectionalStreamingCall;
59*61c4878aSAndroid Build Coastguard Worker
60*61c4878aSAndroid Build Coastguard Worker  /**
61*61c4878aSAndroid Build Coastguard Worker   * Initializes a Manager on top of a TransferService.
62*61c4878aSAndroid Build Coastguard Worker   *
63*61c4878aSAndroid Build Coastguard Worker   * Args:
64*61c4878aSAndroid Build Coastguard Worker   * @param{ServiceClient} service: the pw_rpc transfer service
65*61c4878aSAndroid Build Coastguard Worker   * client
66*61c4878aSAndroid Build Coastguard Worker   * @param{number} defaultResponseTimeoutS: max time to wait between receiving
67*61c4878aSAndroid Build Coastguard Worker   * packets
68*61c4878aSAndroid Build Coastguard Worker   * @param{number} initialResponseTimeoutS: timeout for the first packet; may
69*61c4878aSAndroid Build Coastguard Worker   * be longer to account for transfer handler initialization
70*61c4878aSAndroid Build Coastguard Worker   * @param{number} maxRetries: number of times to retry after a timeout
71*61c4878aSAndroid Build Coastguard Worker   */
72*61c4878aSAndroid Build Coastguard Worker  constructor(
73*61c4878aSAndroid Build Coastguard Worker    private service: ServiceClient,
74*61c4878aSAndroid Build Coastguard Worker    private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S,
75*61c4878aSAndroid Build Coastguard Worker    private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT,
76*61c4878aSAndroid Build Coastguard Worker    private maxRetries = DEFAULT_MAX_RETRIES,
77*61c4878aSAndroid Build Coastguard Worker  ) {}
78*61c4878aSAndroid Build Coastguard Worker
79*61c4878aSAndroid Build Coastguard Worker  /**
80*61c4878aSAndroid Build Coastguard Worker   * Receives ("downloads") data from the server.
81*61c4878aSAndroid Build Coastguard Worker   *
82*61c4878aSAndroid Build Coastguard Worker   * @throws Throws an error when the transfer fails to complete.
83*61c4878aSAndroid Build Coastguard Worker   */
84*61c4878aSAndroid Build Coastguard Worker  async read(
85*61c4878aSAndroid Build Coastguard Worker    resourceId: number,
86*61c4878aSAndroid Build Coastguard Worker    progressCallback?: ProgressCallback,
87*61c4878aSAndroid Build Coastguard Worker  ): Promise<Uint8Array> {
88*61c4878aSAndroid Build Coastguard Worker    if (resourceId in this.readTransfers) {
89*61c4878aSAndroid Build Coastguard Worker      throw new Error(
90*61c4878aSAndroid Build Coastguard Worker        `Read transfer for resource ${resourceId} already exists`,
91*61c4878aSAndroid Build Coastguard Worker      );
92*61c4878aSAndroid Build Coastguard Worker    }
93*61c4878aSAndroid Build Coastguard Worker    const transfer = new ReadTransfer(
94*61c4878aSAndroid Build Coastguard Worker      resourceId,
95*61c4878aSAndroid Build Coastguard Worker      this.sendReadChunkCallback,
96*61c4878aSAndroid Build Coastguard Worker      this.defaultResponseTimeoutS,
97*61c4878aSAndroid Build Coastguard Worker      this.maxRetries,
98*61c4878aSAndroid Build Coastguard Worker      progressCallback,
99*61c4878aSAndroid Build Coastguard Worker    );
100*61c4878aSAndroid Build Coastguard Worker
101*61c4878aSAndroid Build Coastguard Worker    this.startReadTransfer(transfer);
102*61c4878aSAndroid Build Coastguard Worker
103*61c4878aSAndroid Build Coastguard Worker    const status = await transfer.done;
104*61c4878aSAndroid Build Coastguard Worker
105*61c4878aSAndroid Build Coastguard Worker    delete this.readTransfers[transfer.id];
106*61c4878aSAndroid Build Coastguard Worker    if (status !== Status.OK) {
107*61c4878aSAndroid Build Coastguard Worker      throw new TransferError(transfer.id, transfer.status);
108*61c4878aSAndroid Build Coastguard Worker    }
109*61c4878aSAndroid Build Coastguard Worker    return transfer.data;
110*61c4878aSAndroid Build Coastguard Worker  }
111*61c4878aSAndroid Build Coastguard Worker
112*61c4878aSAndroid Build Coastguard Worker  /** Begins a new read transfer, opening the stream if it isn't. */
113*61c4878aSAndroid Build Coastguard Worker  startReadTransfer(transfer: Transfer): void {
114*61c4878aSAndroid Build Coastguard Worker    this.readTransfers[transfer.id] = transfer;
115*61c4878aSAndroid Build Coastguard Worker
116*61c4878aSAndroid Build Coastguard Worker    if (this.readStream === undefined) {
117*61c4878aSAndroid Build Coastguard Worker      this.openReadStream();
118*61c4878aSAndroid Build Coastguard Worker    }
119*61c4878aSAndroid Build Coastguard Worker    console.debug(`Starting new read transfer ${transfer.id}`);
120*61c4878aSAndroid Build Coastguard Worker    transfer.begin();
121*61c4878aSAndroid Build Coastguard Worker  }
122*61c4878aSAndroid Build Coastguard Worker
123*61c4878aSAndroid Build Coastguard Worker  /**
124*61c4878aSAndroid Build Coastguard Worker  Transmits (uploads) data to the server.
125*61c4878aSAndroid Build Coastguard Worker   *
126*61c4878aSAndroid Build Coastguard Worker   * @param{number} resourceId: ID of the resource to which to write.
127*61c4878aSAndroid Build Coastguard Worker   * @param{Uint8Array} data: Data to send to the server.
128*61c4878aSAndroid Build Coastguard Worker   */
129*61c4878aSAndroid Build Coastguard Worker  async write(
130*61c4878aSAndroid Build Coastguard Worker    resourceId: number,
131*61c4878aSAndroid Build Coastguard Worker    data: Uint8Array,
132*61c4878aSAndroid Build Coastguard Worker    progressCallback?: ProgressCallback,
133*61c4878aSAndroid Build Coastguard Worker  ): Promise<void> {
134*61c4878aSAndroid Build Coastguard Worker    const transfer = new WriteTransfer(
135*61c4878aSAndroid Build Coastguard Worker      resourceId,
136*61c4878aSAndroid Build Coastguard Worker      data,
137*61c4878aSAndroid Build Coastguard Worker      this.sendWriteChunkCallback,
138*61c4878aSAndroid Build Coastguard Worker      this.defaultResponseTimeoutS,
139*61c4878aSAndroid Build Coastguard Worker      this.initialResponseTimeoutS,
140*61c4878aSAndroid Build Coastguard Worker      this.maxRetries,
141*61c4878aSAndroid Build Coastguard Worker      progressCallback,
142*61c4878aSAndroid Build Coastguard Worker    );
143*61c4878aSAndroid Build Coastguard Worker    this.startWriteTransfer(transfer);
144*61c4878aSAndroid Build Coastguard Worker
145*61c4878aSAndroid Build Coastguard Worker    const status = await transfer.done;
146*61c4878aSAndroid Build Coastguard Worker
147*61c4878aSAndroid Build Coastguard Worker    delete this.writeTransfers[transfer.id];
148*61c4878aSAndroid Build Coastguard Worker    if (transfer.status !== Status.OK) {
149*61c4878aSAndroid Build Coastguard Worker      throw new TransferError(transfer.id, transfer.status);
150*61c4878aSAndroid Build Coastguard Worker    }
151*61c4878aSAndroid Build Coastguard Worker  }
152*61c4878aSAndroid Build Coastguard Worker
153*61c4878aSAndroid Build Coastguard Worker  sendReadChunkCallback = (chunk: Chunk) => {
154*61c4878aSAndroid Build Coastguard Worker    this.readStream!.send(chunk);
155*61c4878aSAndroid Build Coastguard Worker  };
156*61c4878aSAndroid Build Coastguard Worker
157*61c4878aSAndroid Build Coastguard Worker  sendWriteChunkCallback = (chunk: Chunk) => {
158*61c4878aSAndroid Build Coastguard Worker    this.writeStream!.send(chunk);
159*61c4878aSAndroid Build Coastguard Worker  };
160*61c4878aSAndroid Build Coastguard Worker
161*61c4878aSAndroid Build Coastguard Worker  /** Begins a new write transfer, opening the stream if it isn't */
162*61c4878aSAndroid Build Coastguard Worker  startWriteTransfer(transfer: Transfer): void {
163*61c4878aSAndroid Build Coastguard Worker    this.writeTransfers[transfer.id] = transfer;
164*61c4878aSAndroid Build Coastguard Worker
165*61c4878aSAndroid Build Coastguard Worker    if (!this.writeStream) {
166*61c4878aSAndroid Build Coastguard Worker      this.openWriteStream();
167*61c4878aSAndroid Build Coastguard Worker    }
168*61c4878aSAndroid Build Coastguard Worker
169*61c4878aSAndroid Build Coastguard Worker    console.debug(`Starting new write transfer ${transfer.id}`);
170*61c4878aSAndroid Build Coastguard Worker    transfer.begin();
171*61c4878aSAndroid Build Coastguard Worker  }
172*61c4878aSAndroid Build Coastguard Worker
173*61c4878aSAndroid Build Coastguard Worker  private openReadStream(): void {
174*61c4878aSAndroid Build Coastguard Worker    const readRpc = this.service.method(
175*61c4878aSAndroid Build Coastguard Worker      'Read',
176*61c4878aSAndroid Build Coastguard Worker    )! as BidirectionalStreamingMethodStub;
177*61c4878aSAndroid Build Coastguard Worker    this.readStream = readRpc.invoke(
178*61c4878aSAndroid Build Coastguard Worker      (chunk: Chunk) => {
179*61c4878aSAndroid Build Coastguard Worker        this.handleChunk(this.readTransfers, chunk);
180*61c4878aSAndroid Build Coastguard Worker      },
181*61c4878aSAndroid Build Coastguard Worker      () => {
182*61c4878aSAndroid Build Coastguard Worker        // Do nothing.
183*61c4878aSAndroid Build Coastguard Worker      },
184*61c4878aSAndroid Build Coastguard Worker      this.onReadError,
185*61c4878aSAndroid Build Coastguard Worker    );
186*61c4878aSAndroid Build Coastguard Worker  }
187*61c4878aSAndroid Build Coastguard Worker
188*61c4878aSAndroid Build Coastguard Worker  private openWriteStream(): void {
189*61c4878aSAndroid Build Coastguard Worker    const writeRpc = this.service.method(
190*61c4878aSAndroid Build Coastguard Worker      'Write',
191*61c4878aSAndroid Build Coastguard Worker    )! as BidirectionalStreamingMethodStub;
192*61c4878aSAndroid Build Coastguard Worker    this.writeStream = writeRpc.invoke(
193*61c4878aSAndroid Build Coastguard Worker      (chunk: Chunk) => {
194*61c4878aSAndroid Build Coastguard Worker        this.handleChunk(this.writeTransfers, chunk);
195*61c4878aSAndroid Build Coastguard Worker      },
196*61c4878aSAndroid Build Coastguard Worker      () => {
197*61c4878aSAndroid Build Coastguard Worker        // Do nothing.
198*61c4878aSAndroid Build Coastguard Worker      },
199*61c4878aSAndroid Build Coastguard Worker      this.onWriteError,
200*61c4878aSAndroid Build Coastguard Worker    );
201*61c4878aSAndroid Build Coastguard Worker  }
202*61c4878aSAndroid Build Coastguard Worker
203*61c4878aSAndroid Build Coastguard Worker  /**
204*61c4878aSAndroid Build Coastguard Worker   * Callback for an RPC error in the read stream.
205*61c4878aSAndroid Build Coastguard Worker   */
206*61c4878aSAndroid Build Coastguard Worker  private onReadError = (status: Status) => {
207*61c4878aSAndroid Build Coastguard Worker    if (status === Status.FAILED_PRECONDITION) {
208*61c4878aSAndroid Build Coastguard Worker      // FAILED_PRECONDITION indicates that the stream packet was not
209*61c4878aSAndroid Build Coastguard Worker      // recognized as the stream is not open. This could occur if the
210*61c4878aSAndroid Build Coastguard Worker      // server resets during an active transfer. Re-open the stream to
211*61c4878aSAndroid Build Coastguard Worker      // allow pending transfers to continue.
212*61c4878aSAndroid Build Coastguard Worker      this.openReadStream();
213*61c4878aSAndroid Build Coastguard Worker      return;
214*61c4878aSAndroid Build Coastguard Worker    }
215*61c4878aSAndroid Build Coastguard Worker
216*61c4878aSAndroid Build Coastguard Worker    // Other errors are unrecoverable. Clear the stream and cancel any
217*61c4878aSAndroid Build Coastguard Worker    // pending transfers with an INTERNAL status as this is a system
218*61c4878aSAndroid Build Coastguard Worker    // error.
219*61c4878aSAndroid Build Coastguard Worker    this.readStream = undefined;
220*61c4878aSAndroid Build Coastguard Worker
221*61c4878aSAndroid Build Coastguard Worker    for (const key in this.readTransfers) {
222*61c4878aSAndroid Build Coastguard Worker      const transfer = this.readTransfers[key];
223*61c4878aSAndroid Build Coastguard Worker      transfer.abort(Status.INTERNAL);
224*61c4878aSAndroid Build Coastguard Worker    }
225*61c4878aSAndroid Build Coastguard Worker    this.readTransfers = {};
226*61c4878aSAndroid Build Coastguard Worker    console.error(`Read stream shut down ${Status[status]}`);
227*61c4878aSAndroid Build Coastguard Worker  };
228*61c4878aSAndroid Build Coastguard Worker
229*61c4878aSAndroid Build Coastguard Worker  private onWriteError = (status: Status) => {
230*61c4878aSAndroid Build Coastguard Worker    if (status === Status.FAILED_PRECONDITION) {
231*61c4878aSAndroid Build Coastguard Worker      // FAILED_PRECONDITION indicates that the stream packet was not
232*61c4878aSAndroid Build Coastguard Worker      // recognized as the stream is not open. This could occur if the
233*61c4878aSAndroid Build Coastguard Worker      // server resets during an active transfer. Re-open the stream to
234*61c4878aSAndroid Build Coastguard Worker      // allow pending transfers to continue.
235*61c4878aSAndroid Build Coastguard Worker      this.openWriteStream();
236*61c4878aSAndroid Build Coastguard Worker    } else {
237*61c4878aSAndroid Build Coastguard Worker      // Other errors are unrecoverable. Clear the stream and cancel any
238*61c4878aSAndroid Build Coastguard Worker      // pending transfers with an INTERNAL status as this is a system
239*61c4878aSAndroid Build Coastguard Worker      // error.
240*61c4878aSAndroid Build Coastguard Worker      this.writeStream = undefined;
241*61c4878aSAndroid Build Coastguard Worker
242*61c4878aSAndroid Build Coastguard Worker      for (const key in this.writeTransfers) {
243*61c4878aSAndroid Build Coastguard Worker        const transfer = this.writeTransfers[key];
244*61c4878aSAndroid Build Coastguard Worker        transfer.abort(Status.INTERNAL);
245*61c4878aSAndroid Build Coastguard Worker      }
246*61c4878aSAndroid Build Coastguard Worker      this.writeTransfers = {};
247*61c4878aSAndroid Build Coastguard Worker      console.error(`Write stream shut down: ${Status[status]}`);
248*61c4878aSAndroid Build Coastguard Worker    }
249*61c4878aSAndroid Build Coastguard Worker  };
250*61c4878aSAndroid Build Coastguard Worker
251*61c4878aSAndroid Build Coastguard Worker  /**
252*61c4878aSAndroid Build Coastguard Worker   * Processes an incoming chunk from a stream.
253*61c4878aSAndroid Build Coastguard Worker   *
254*61c4878aSAndroid Build Coastguard Worker   * The chunk is dispatched to an active transfer based on its ID. If the
255*61c4878aSAndroid Build Coastguard Worker   * transfer indicates that it is complete, the provided completion callback
256*61c4878aSAndroid Build Coastguard Worker   * is invoked.
257*61c4878aSAndroid Build Coastguard Worker   */
258*61c4878aSAndroid Build Coastguard Worker  private async handleChunk(transfers: TransferDict, chunk: Chunk) {
259*61c4878aSAndroid Build Coastguard Worker    const transfer = transfers[chunk.getTransferId()];
260*61c4878aSAndroid Build Coastguard Worker    if (transfer === undefined) {
261*61c4878aSAndroid Build Coastguard Worker      console.error(
262*61c4878aSAndroid Build Coastguard Worker        `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}`,
263*61c4878aSAndroid Build Coastguard Worker      );
264*61c4878aSAndroid Build Coastguard Worker      return;
265*61c4878aSAndroid Build Coastguard Worker    }
266*61c4878aSAndroid Build Coastguard Worker    transfer.handleChunk(chunk);
267*61c4878aSAndroid Build Coastguard Worker  }
268*61c4878aSAndroid Build Coastguard Worker}
269*61c4878aSAndroid Build Coastguard Worker
270*61c4878aSAndroid Build Coastguard Worker/**
271*61c4878aSAndroid Build Coastguard Worker * Exception raised when a transfer fails.
272*61c4878aSAndroid Build Coastguard Worker *
273*61c4878aSAndroid Build Coastguard Worker * Stores the ID of the failed transfer and the error that occured.
274*61c4878aSAndroid Build Coastguard Worker */
275*61c4878aSAndroid Build Coastguard Workerclass TransferError extends Error {
276*61c4878aSAndroid Build Coastguard Worker  id: number;
277*61c4878aSAndroid Build Coastguard Worker  status: Status;
278*61c4878aSAndroid Build Coastguard Worker
279*61c4878aSAndroid Build Coastguard Worker  constructor(id: number, status: Status) {
280*61c4878aSAndroid Build Coastguard Worker    super(`Transfer ${id} failed with status ${Status[status]}`);
281*61c4878aSAndroid Build Coastguard Worker    this.status = status;
282*61c4878aSAndroid Build Coastguard Worker    this.id = id;
283*61c4878aSAndroid Build Coastguard Worker  }
284*61c4878aSAndroid Build Coastguard Worker}
285