1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { Message } from 'google-protobuf'; 16import { Status } from 'pigweedjs/pw_status'; 17 18import { Call } from './call'; 19import { Channel, Method, Service } from './descriptors'; 20import * as packets from './packets'; 21 22/** Max number that can fit into a 2-byte varint */ 23const MAX_CALL_ID = 1 << 14; 24/** Calls with ID of `kOpenCallId` were unrequested, and are updated to have the 25 call ID of the first matching request. */ 26const LEGACY_OPEN_CALL_ID = 0; 27const OPEN_CALL_ID = 2 ** 32 - 1; 28 29/** Data class for a pending RPC call. */ 30export class Rpc { 31 readonly channel: Channel; 32 readonly service: Service; 33 readonly method: Method; 34 35 constructor(channel: Channel, service: Service, method: Method) { 36 this.channel = channel; 37 this.service = service; 38 this.method = method; 39 } 40 41 /** Returns channel service method callId tuple */ 42 getIdSet(callId: number): [number, number, number, number] { 43 return [this.channel.id, this.service.id, this.method.id, callId]; 44 } 45 46 /** 47 * Returns a string sequence to uniquely identify channel, service, method 48 * and call ID. This can be used to hash the Rpc. 49 * 50 * For example: "12346789.23452345.12341234.34" 51 */ 52 getIdString(callId: number): string { 53 return `${this.channel.id}.${this.service.id}.${this.method.id}.${callId}`; 54 } 55 56 toString(): string { 57 return ( 58 `${this.service.name}.${this.method.name} on channel ` + 59 `${this.channel.id}` 60 ); 61 } 62} 63 64/** Tracks pending RPCs and encodes outgoing RPC packets. */ 65export class PendingCalls { 66 pending: Map<string, Call> = new Map(); 67 // We skip callId zero to avoid LEGACY_OPEN_CALL_ID. 68 nextCallId = 1; 69 70 /** Starts the provided RPC and returns the encoded packet to send. */ 71 request(rpc: Rpc, request: Message, call: Call): Uint8Array { 72 this.open(rpc, call); 73 console.log(`Starting ${rpc}`); 74 return packets.encodeRequest(rpc.getIdSet(call.callId), request); 75 } 76 77 allocateCallId(): number { 78 const callId = this.nextCallId; 79 this.nextCallId = (this.nextCallId + 1) % MAX_CALL_ID; 80 // We skip callId zero to avoid LEGACY_OPEN_CALL_ID. 81 if (this.nextCallId == 0) { 82 this.nextCallId = 1; 83 } 84 return callId; 85 } 86 87 /** Calls request and sends the resulting packet to the channel. */ 88 sendRequest( 89 rpc: Rpc, 90 call: Call, 91 ignoreError: boolean, 92 request?: Message, 93 ): Call | undefined { 94 const previous = this.open(rpc, call); 95 const packet = packets.encodeRequest( 96 rpc.getIdSet(call.callId), 97 request, 98 rpc.method.customRequestSerializer, 99 ); 100 try { 101 rpc.channel.send(packet); 102 } catch (error) { 103 if (!ignoreError) { 104 throw error; 105 } 106 } 107 return previous; 108 } 109 110 /** 111 * Creates a call for an RPC, but does not invoke it. 112 * 113 * open() can be used to receive streaming responses to an RPC that was not 114 * invoked by this client. For example, a server may stream logs with a 115 * server streaming RPC prior to any clients invoking it. 116 */ 117 open(rpc: Rpc, call: Call): Call | undefined { 118 console.debug(`Starting ${rpc}`); 119 const previous = this.pending.get(rpc.getIdString(call.callId)); 120 this.pending.set(rpc.getIdString(call.callId), call); 121 return previous; 122 } 123 124 sendClientStream(rpc: Rpc, message: Message, callId: number) { 125 if (this.getPending(rpc, callId) === undefined) { 126 throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`); 127 } 128 rpc.channel.send( 129 packets.encodeClientStream( 130 rpc.getIdSet(callId), 131 message, 132 rpc.method.customRequestSerializer, 133 ), 134 ); 135 } 136 137 sendClientStreamEnd(rpc: Rpc, callId: number) { 138 if (this.getPending(rpc, callId) === undefined) { 139 throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`); 140 } 141 rpc.channel.send(packets.encodeClientStreamEnd(rpc.getIdSet(callId))); 142 } 143 144 /** Cancels the RPC. Returns the CLIENT_ERROR packet to send. */ 145 cancel(rpc: Rpc, callId: number): Uint8Array { 146 console.debug(`Cancelling ${rpc}`); 147 this.pending.delete(rpc.getIdString(callId)); 148 return packets.encodeCancel(rpc.getIdSet(callId)); 149 } 150 151 /** Calls cancel and sends the cancel packet, if any, to the channel. */ 152 sendCancel(rpc: Rpc, callId: number): boolean { 153 let packet: Uint8Array | undefined; 154 try { 155 packet = this.cancel(rpc, callId); 156 } catch (err) { 157 return false; 158 } 159 160 if (packet !== undefined) { 161 rpc.channel.send(packet); 162 } 163 return true; 164 } 165 166 /** Gets the pending RPC's call. If status is set, clears the RPC. */ 167 getPending(rpc: Rpc, callId: number, status?: Status): Call | undefined { 168 let call: Call | undefined = this.pending.get(rpc.getIdString(callId)); 169 if (callId === LEGACY_OPEN_CALL_ID || callId === OPEN_CALL_ID) { 170 // Calls with ID `OPEN_CALL_ID` were unrequested, and are updated to 171 // have the call ID of the first matching request. 172 const allPendingCalls = Array.from(this.pending.values()); 173 for (const pending in allPendingCalls) { 174 const curCall = allPendingCalls[pending]; 175 if (curCall.rpc.getIdString(0) === rpc.getIdString(0)) { 176 call = curCall; 177 break; 178 } 179 } 180 } 181 if (status === undefined) { 182 return call; 183 } 184 185 this.pending.delete(rpc.getIdString(callId)); 186 return call; 187 } 188} 189