1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { setPathOnObject } from './object_set'; 16import { Decoder, Encoder } from 'pigweedjs/pw_hdlc'; 17import { 18 Client, 19 Channel, 20 ServiceClient, 21 UnaryMethodStub, 22 MethodStub, 23 ServerStreamingMethodStub, 24 Call, 25} from 'pigweedjs/pw_rpc'; 26import { WebSerialTransport } from '../transport/web_serial_transport'; 27import { ProtoCollection } from 'pigweedjs/pw_protobuf_compiler'; 28import { Message } from 'google-protobuf'; 29 30function protoFieldToMethodName(fieldName: string) { 31 return fieldName.split('_').map(titleCase).join(''); 32} 33function titleCase(title: string) { 34 return title.charAt(0).toUpperCase() + title.slice(1); 35} 36 37interface RPCUnderlyingSource extends UnderlyingSource { 38 call?: Call; 39} 40 41export class RPCReadableStream<R = any> extends ReadableStream<R> { 42 constructor(private underlyingSource: RPCUnderlyingSource) { 43 super(underlyingSource); 44 } 45 46 get call(): Call { 47 return this.underlyingSource.call!; 48 } 49 50 override cancel(): Promise<void> { 51 this.call.cancel(); 52 return Promise.resolve(); 53 } 54} 55 56export class Device { 57 private protoCollection: ProtoCollection; 58 private transport: WebSerialTransport; 59 private decoder: Decoder; 60 private encoder: Encoder; 61 private rpcAddress: number; 62 private nameToMethodArgumentsMap: any; 63 client: Client; 64 rpcs: any; 65 66 constructor( 67 protoCollection: ProtoCollection, 68 transport: WebSerialTransport = new WebSerialTransport(), 69 channel = 1, 70 rpcAddress = 82, 71 ) { 72 this.transport = transport; 73 this.rpcAddress = rpcAddress; 74 this.protoCollection = protoCollection; 75 this.decoder = new Decoder(); 76 this.encoder = new Encoder(); 77 this.nameToMethodArgumentsMap = {}; 78 const channels = [ 79 new Channel(channel, (bytes) => { 80 const hdlcBytes = this.encoder.uiFrame(this.rpcAddress, bytes); 81 this.transport.sendChunk(hdlcBytes); 82 }), 83 ]; 84 this.client = Client.fromProtoSet(channels, this.protoCollection); 85 86 this.setupRpcs(); 87 } 88 89 async connect() { 90 await this.transport.connect(); 91 this.transport.chunks.subscribe((item) => { 92 const decoded = this.decoder.process(item); 93 for (const frame of decoded) { 94 if (frame.address === this.rpcAddress) { 95 this.client.processPacket(frame.data); 96 } 97 } 98 }); 99 } 100 101 getMethodArguments(fullPath: string) { 102 return this.nameToMethodArgumentsMap[fullPath]; 103 } 104 105 private setupRpcs() { 106 const rpcMap = {}; 107 const channel = this.client.channel()!; 108 const servicesKeys = Array.from(channel.services.keys()); 109 servicesKeys.forEach((serviceKey) => { 110 setPathOnObject( 111 rpcMap, 112 serviceKey, 113 this.mapServiceMethods(channel.services.get(serviceKey)!), 114 ); 115 }); 116 this.rpcs = rpcMap; 117 } 118 119 private mapServiceMethods(service: ServiceClient) { 120 const methodMap: { [index: string]: any } = {}; 121 const methodKeys = Array.from(service.methodsByName.keys()); 122 methodKeys 123 .filter( 124 (method: any) => 125 service.methodsByName.get(method) instanceof UnaryMethodStub || 126 service.methodsByName.get(method) instanceof 127 ServerStreamingMethodStub, 128 ) 129 .forEach((key) => { 130 const fn = this.createMethodWrapper(service.methodsByName.get(key)!); 131 methodMap[key] = fn; 132 }); 133 return methodMap; 134 } 135 136 private createMethodWrapper(realMethod: MethodStub) { 137 if (realMethod instanceof UnaryMethodStub) { 138 return this.createUnaryMethodWrapper(realMethod); 139 } else if (realMethod instanceof ServerStreamingMethodStub) { 140 return this.createServerStreamingMethodWrapper(realMethod); 141 } 142 throw new Error(`Unknown method: ${realMethod}`); 143 } 144 145 private createUnaryMethodWrapper(realMethod: UnaryMethodStub) { 146 const call = async (request: Message, timeout?: number) => { 147 return await realMethod.call(request, timeout); 148 }; 149 const createRequest = () => { 150 return new realMethod.method.requestType(); 151 }; 152 return { call, createRequest }; 153 } 154 155 private createServerStreamingMethodWrapper( 156 realMethod: ServerStreamingMethodStub, 157 ) { 158 const call = (request: Message) => { 159 const source: RPCUnderlyingSource = { 160 start(controller: ReadableStreamDefaultController) { 161 this.call = realMethod.invoke( 162 request, 163 (msg) => { 164 controller.enqueue(msg); 165 }, 166 () => { 167 controller.close(); 168 }, 169 ); 170 }, 171 cancel() { 172 this.call!.cancel(); 173 }, 174 }; 175 return new RPCReadableStream<Message>(source); 176 }; 177 const createRequest = () => { 178 return new realMethod.method.requestType(); 179 }; 180 return { call, createRequest }; 181 } 182} 183