xref: /aosp_15_r20/external/pigweed/ts/transport/web_serial_transport.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser */
16import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
17import DeviceTransport from './device_transport';
18import type {
19  SerialPort,
20  Serial,
21  SerialOptions,
22  Navigator,
23  SerialPortFilter,
24} from '../types/serial';
25
26const DEFAULT_SERIAL_OPTIONS: SerialOptions & { baudRate: number } = {
27  // Some versions of chrome use `baudrate` (linux)
28  baudrate: 115200,
29  // Some versions use `baudRate` (chromebook)
30  baudRate: 115200,
31  databits: 8,
32  parity: 'none',
33  stopbits: 1,
34};
35
36interface PortReadConnection {
37  chunks: Observable<Uint8Array>;
38  errors: Observable<Error>;
39}
40
41interface PortConnection extends PortReadConnection {
42  sendChunk: (chunk: Uint8Array) => Promise<void>;
43}
44
45export class DeviceLostError extends Error {
46  override message = 'The device has been lost';
47}
48
49export class DeviceLockedError extends Error {
50  override message =
51    "The device's port is locked. Try unplugging it" +
52    ' and plugging it back in.';
53}
54
55/**
56 * WebSerialTransport sends and receives UInt8Arrays to and
57 * from a serial device connected over USB.
58 */
59export class WebSerialTransport implements DeviceTransport {
60  chunks = new Subject<Uint8Array>();
61  errors = new Subject<Error>();
62  connected = new BehaviorSubject<boolean>(false);
63  private portConnections: Map<SerialPort, PortConnection> = new Map();
64  private activePortConnectionConnection: PortConnection | undefined;
65  private rxSubscriptions: Subscription[] = [];
66  private writer: WritableStreamDefaultWriter<Uint8Array> | undefined;
67  private abortController: AbortController | undefined;
68
69  constructor(
70    private serial: Serial = (navigator as unknown as Navigator).serial,
71    private filters: SerialPortFilter[] = [],
72    private serialOptions = DEFAULT_SERIAL_OPTIONS,
73  ) {}
74
75  /**
76   * Send a UInt8Array chunk of data to the connected device.
77   * @param {Uint8Array} chunk The chunk to send
78   */
79  async sendChunk(chunk: Uint8Array): Promise<void> {
80    if (this.activePortConnectionConnection) {
81      return this.activePortConnectionConnection.sendChunk(chunk);
82    }
83    throw new Error('Device not connected');
84  }
85
86  /**
87   * Attempt to open a connection to a device. This includes
88   * asking the user to select a serial port and should only
89   * be called in response to user interaction.
90   */
91  async connect(): Promise<void> {
92    const port = await this.serial.requestPort({ filters: this.filters });
93    await this.connectPort(port);
94  }
95
96  async disconnect() {
97    for (const subscription of this.rxSubscriptions) {
98      subscription.unsubscribe();
99    }
100    this.rxSubscriptions = [];
101
102    this.activePortConnectionConnection = undefined;
103    this.portConnections.clear();
104    this.abortController?.abort();
105
106    try {
107      await this.writer?.close();
108    } catch (err) {
109      this.errors.next(err as Error);
110    }
111    this.connected.next(false);
112  }
113
114  /**
115   * Connect to a given SerialPort. This involves no user interaction.
116   * and can be called whenever a port is available.
117   */
118  async connectPort(port: SerialPort): Promise<void> {
119    this.serial.addEventListener('disconnect', (e: any) => {
120      if (e.target === port) {
121        this.connected.next(false);
122      }
123    });
124    this.activePortConnectionConnection =
125      this.portConnections.get(port) ?? (await this.connectNewPort(port));
126
127    this.connected.next(true);
128
129    this.rxSubscriptions.push(
130      this.activePortConnectionConnection.chunks.subscribe(
131        (chunk: any) => {
132          this.chunks.next(chunk);
133        },
134        (err: any) => {
135          throw new Error(`Chunks observable had an unexpected error ${err}`);
136        },
137        () => {
138          this.connected.next(false);
139          this.portConnections.delete(port);
140          // Don't complete the chunks observable because then it would not
141          // be able to forward any future chunks.
142        },
143      ),
144    );
145
146    this.rxSubscriptions.push(
147      this.activePortConnectionConnection.errors.subscribe((error: any) => {
148        this.errors.next(error);
149        if (error instanceof DeviceLostError) {
150          // The device has been lost
151          this.connected.next(false);
152        }
153      }),
154    );
155  }
156
157  private async connectNewPort(port: SerialPort): Promise<PortConnection> {
158    await port.open(this.serialOptions);
159    const writer = port.writable.getWriter();
160    this.writer = writer;
161
162    async function sendChunk(chunk: Uint8Array) {
163      await writer.ready;
164      await writer.write(chunk);
165    }
166
167    const { chunks, errors } = this.getChunks(port);
168
169    const connection: PortConnection = { sendChunk, chunks, errors };
170    this.portConnections.set(port, connection);
171    return connection;
172  }
173
174  private getChunks(port: SerialPort): PortReadConnection {
175    const chunks = new Subject<Uint8Array>();
176    const errors = new Subject<Error>();
177    const abortController = new AbortController();
178    this.abortController = abortController;
179
180    async function read() {
181      if (!port.readable) {
182        throw new DeviceLostError();
183      }
184      if (port.readable.locked) {
185        throw new DeviceLockedError();
186      }
187      await port.readable.pipeTo(
188        new WritableStream({
189          write: (chunk) => {
190            chunks.next(chunk);
191          },
192          close: () => {
193            chunks.complete();
194            errors.complete();
195          },
196        }),
197        { signal: abortController.signal },
198      );
199    }
200
201    function connect() {
202      read().catch((err) => {
203        // Don't error the chunks observable since that stops it from
204        // reading any more packets, and we often want to continue
205        // despite an error. Instead, push errors to the 'errors'
206        // observable.
207        errors.next(err);
208      });
209    }
210
211    connect();
212
213    return { chunks, errors };
214  }
215}
216