xref: /aosp_15_r20/external/pigweed/pw_rpc/ts/rpc_classes.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import { Message } from 'google-protobuf';
16import { Status } from 'pigweedjs/pw_status';
17
18import { Call } from './call';
19import { Channel, Method, Service } from './descriptors';
20import * as packets from './packets';
21
22/** Max number that can fit into a 2-byte varint */
23const MAX_CALL_ID = 1 << 14;
24/** Calls with ID of `kOpenCallId` were unrequested, and are updated to have the
25    call ID of the first matching request. */
26const LEGACY_OPEN_CALL_ID = 0;
27const OPEN_CALL_ID = 2 ** 32 - 1;
28
29/** Data class for a pending RPC call. */
30export class Rpc {
31  readonly channel: Channel;
32  readonly service: Service;
33  readonly method: Method;
34
35  constructor(channel: Channel, service: Service, method: Method) {
36    this.channel = channel;
37    this.service = service;
38    this.method = method;
39  }
40
41  /** Returns channel service method callId tuple */
42  getIdSet(callId: number): [number, number, number, number] {
43    return [this.channel.id, this.service.id, this.method.id, callId];
44  }
45
46  /**
47   * Returns a string sequence to uniquely identify channel, service, method
48   * and call ID. This can be used to hash the Rpc.
49   *
50   * For example: "12346789.23452345.12341234.34"
51   */
52  getIdString(callId: number): string {
53    return `${this.channel.id}.${this.service.id}.${this.method.id}.${callId}`;
54  }
55
56  toString(): string {
57    return (
58      `${this.service.name}.${this.method.name} on channel ` +
59      `${this.channel.id}`
60    );
61  }
62}
63
64/** Tracks pending RPCs and encodes outgoing RPC packets. */
65export class PendingCalls {
66  pending: Map<string, Call> = new Map();
67  // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
68  nextCallId = 1;
69
70  /** Starts the provided RPC and returns the encoded packet to send. */
71  request(rpc: Rpc, request: Message, call: Call): Uint8Array {
72    this.open(rpc, call);
73    console.log(`Starting ${rpc}`);
74    return packets.encodeRequest(rpc.getIdSet(call.callId), request);
75  }
76
77  allocateCallId(): number {
78    const callId = this.nextCallId;
79    this.nextCallId = (this.nextCallId + 1) % MAX_CALL_ID;
80    // We skip callId zero to avoid LEGACY_OPEN_CALL_ID.
81    if (this.nextCallId == 0) {
82      this.nextCallId = 1;
83    }
84    return callId;
85  }
86
87  /** Calls request and sends the resulting packet to the channel. */
88  sendRequest(
89    rpc: Rpc,
90    call: Call,
91    ignoreError: boolean,
92    request?: Message,
93  ): Call | undefined {
94    const previous = this.open(rpc, call);
95    const packet = packets.encodeRequest(
96      rpc.getIdSet(call.callId),
97      request,
98      rpc.method.customRequestSerializer,
99    );
100    try {
101      rpc.channel.send(packet);
102    } catch (error) {
103      if (!ignoreError) {
104        throw error;
105      }
106    }
107    return previous;
108  }
109
110  /**
111   * Creates a call for an RPC, but does not invoke it.
112   *
113   * open() can be used to receive streaming responses to an RPC that was not
114   * invoked by this client. For example, a server may stream logs with a
115   * server streaming RPC prior to any clients invoking it.
116   */
117  open(rpc: Rpc, call: Call): Call | undefined {
118    console.debug(`Starting ${rpc}`);
119    const previous = this.pending.get(rpc.getIdString(call.callId));
120    this.pending.set(rpc.getIdString(call.callId), call);
121    return previous;
122  }
123
124  sendClientStream(rpc: Rpc, message: Message, callId: number) {
125    if (this.getPending(rpc, callId) === undefined) {
126      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
127    }
128    rpc.channel.send(
129      packets.encodeClientStream(
130        rpc.getIdSet(callId),
131        message,
132        rpc.method.customRequestSerializer,
133      ),
134    );
135  }
136
137  sendClientStreamEnd(rpc: Rpc, callId: number) {
138    if (this.getPending(rpc, callId) === undefined) {
139      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
140    }
141    rpc.channel.send(packets.encodeClientStreamEnd(rpc.getIdSet(callId)));
142  }
143
144  /** Cancels the RPC. Returns the CLIENT_ERROR packet to send. */
145  cancel(rpc: Rpc, callId: number): Uint8Array {
146    console.debug(`Cancelling ${rpc}`);
147    this.pending.delete(rpc.getIdString(callId));
148    return packets.encodeCancel(rpc.getIdSet(callId));
149  }
150
151  /** Calls cancel and sends the cancel packet, if any, to the channel. */
152  sendCancel(rpc: Rpc, callId: number): boolean {
153    let packet: Uint8Array | undefined;
154    try {
155      packet = this.cancel(rpc, callId);
156    } catch (err) {
157      return false;
158    }
159
160    if (packet !== undefined) {
161      rpc.channel.send(packet);
162    }
163    return true;
164  }
165
166  /** Gets the pending RPC's call. If status is set, clears the RPC. */
167  getPending(rpc: Rpc, callId: number, status?: Status): Call | undefined {
168    let call: Call | undefined = this.pending.get(rpc.getIdString(callId));
169    if (callId === LEGACY_OPEN_CALL_ID || callId === OPEN_CALL_ID) {
170      // Calls with ID `OPEN_CALL_ID` were unrequested, and are updated to
171      // have the call ID of the first matching request.
172      const allPendingCalls = Array.from(this.pending.values());
173      for (const pending in allPendingCalls) {
174        const curCall = allPendingCalls[pending];
175        if (curCall.rpc.getIdString(0) === rpc.getIdString(0)) {
176          call = curCall;
177          break;
178        }
179      }
180    }
181    if (status === undefined) {
182      return call;
183    }
184
185    this.pending.delete(rpc.getIdString(callId));
186    return call;
187  }
188}
189