1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/* eslint-env browser */ 16import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs'; 17import DeviceTransport from './device_transport'; 18import type { 19 SerialPort, 20 Serial, 21 SerialOptions, 22 Navigator, 23 SerialPortFilter, 24} from '../types/serial'; 25 26const DEFAULT_SERIAL_OPTIONS: SerialOptions & { baudRate: number } = { 27 // Some versions of chrome use `baudrate` (linux) 28 baudrate: 115200, 29 // Some versions use `baudRate` (chromebook) 30 baudRate: 115200, 31 databits: 8, 32 parity: 'none', 33 stopbits: 1, 34}; 35 36interface PortReadConnection { 37 chunks: Observable<Uint8Array>; 38 errors: Observable<Error>; 39} 40 41interface PortConnection extends PortReadConnection { 42 sendChunk: (chunk: Uint8Array) => Promise<void>; 43} 44 45export class DeviceLostError extends Error { 46 override message = 'The device has been lost'; 47} 48 49export class DeviceLockedError extends Error { 50 override message = 51 "The device's port is locked. Try unplugging it" + 52 ' and plugging it back in.'; 53} 54 55/** 56 * WebSerialTransport sends and receives UInt8Arrays to and 57 * from a serial device connected over USB. 58 */ 59export class WebSerialTransport implements DeviceTransport { 60 chunks = new Subject<Uint8Array>(); 61 errors = new Subject<Error>(); 62 connected = new BehaviorSubject<boolean>(false); 63 private portConnections: Map<SerialPort, PortConnection> = new Map(); 64 private activePortConnectionConnection: PortConnection | undefined; 65 private rxSubscriptions: Subscription[] = []; 66 private writer: WritableStreamDefaultWriter<Uint8Array> | undefined; 67 private abortController: AbortController | undefined; 68 69 constructor( 70 private serial: Serial = (navigator as unknown as Navigator).serial, 71 private filters: SerialPortFilter[] = [], 72 private serialOptions = DEFAULT_SERIAL_OPTIONS, 73 ) {} 74 75 /** 76 * Send a UInt8Array chunk of data to the connected device. 77 * @param {Uint8Array} chunk The chunk to send 78 */ 79 async sendChunk(chunk: Uint8Array): Promise<void> { 80 if (this.activePortConnectionConnection) { 81 return this.activePortConnectionConnection.sendChunk(chunk); 82 } 83 throw new Error('Device not connected'); 84 } 85 86 /** 87 * Attempt to open a connection to a device. This includes 88 * asking the user to select a serial port and should only 89 * be called in response to user interaction. 90 */ 91 async connect(): Promise<void> { 92 const port = await this.serial.requestPort({ filters: this.filters }); 93 await this.connectPort(port); 94 } 95 96 async disconnect() { 97 for (const subscription of this.rxSubscriptions) { 98 subscription.unsubscribe(); 99 } 100 this.rxSubscriptions = []; 101 102 this.activePortConnectionConnection = undefined; 103 this.portConnections.clear(); 104 this.abortController?.abort(); 105 106 try { 107 await this.writer?.close(); 108 } catch (err) { 109 this.errors.next(err as Error); 110 } 111 this.connected.next(false); 112 } 113 114 /** 115 * Connect to a given SerialPort. This involves no user interaction. 116 * and can be called whenever a port is available. 117 */ 118 async connectPort(port: SerialPort): Promise<void> { 119 this.serial.addEventListener('disconnect', (e: any) => { 120 if (e.target === port) { 121 this.connected.next(false); 122 } 123 }); 124 this.activePortConnectionConnection = 125 this.portConnections.get(port) ?? (await this.connectNewPort(port)); 126 127 this.connected.next(true); 128 129 this.rxSubscriptions.push( 130 this.activePortConnectionConnection.chunks.subscribe( 131 (chunk: any) => { 132 this.chunks.next(chunk); 133 }, 134 (err: any) => { 135 throw new Error(`Chunks observable had an unexpected error ${err}`); 136 }, 137 () => { 138 this.connected.next(false); 139 this.portConnections.delete(port); 140 // Don't complete the chunks observable because then it would not 141 // be able to forward any future chunks. 142 }, 143 ), 144 ); 145 146 this.rxSubscriptions.push( 147 this.activePortConnectionConnection.errors.subscribe((error: any) => { 148 this.errors.next(error); 149 if (error instanceof DeviceLostError) { 150 // The device has been lost 151 this.connected.next(false); 152 } 153 }), 154 ); 155 } 156 157 private async connectNewPort(port: SerialPort): Promise<PortConnection> { 158 await port.open(this.serialOptions); 159 const writer = port.writable.getWriter(); 160 this.writer = writer; 161 162 async function sendChunk(chunk: Uint8Array) { 163 await writer.ready; 164 await writer.write(chunk); 165 } 166 167 const { chunks, errors } = this.getChunks(port); 168 169 const connection: PortConnection = { sendChunk, chunks, errors }; 170 this.portConnections.set(port, connection); 171 return connection; 172 } 173 174 private getChunks(port: SerialPort): PortReadConnection { 175 const chunks = new Subject<Uint8Array>(); 176 const errors = new Subject<Error>(); 177 const abortController = new AbortController(); 178 this.abortController = abortController; 179 180 async function read() { 181 if (!port.readable) { 182 throw new DeviceLostError(); 183 } 184 if (port.readable.locked) { 185 throw new DeviceLockedError(); 186 } 187 await port.readable.pipeTo( 188 new WritableStream({ 189 write: (chunk) => { 190 chunks.next(chunk); 191 }, 192 close: () => { 193 chunks.complete(); 194 errors.complete(); 195 }, 196 }), 197 { signal: abortController.signal }, 198 ); 199 } 200 201 function connect() { 202 read().catch((err) => { 203 // Don't error the chunks observable since that stops it from 204 // reading any more packets, and we often want to continue 205 // despite an error. Instead, push errors to the 'errors' 206 // observable. 207 errors.next(err); 208 }); 209 } 210 211 connect(); 212 213 return { chunks, errors }; 214 } 215} 216