1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/* eslint-env browser */ 16 17import { SomeMessage } from 'pigweedjs/protos/pw_rpc/ts/test2_pb'; 18import { Status } from 'pigweedjs/pw_status'; 19 20import { Call, ServerStreamingCall } from './call'; 21import { Channel, Method, Service } from './descriptors'; 22import { PendingCalls, Rpc } from './rpc_classes'; 23 24class FakeRpc { 25 readonly channel: any = undefined; 26 readonly service: any = undefined; 27 readonly method: any = undefined; 28 29 getIdSet(callId: number): [number, number, number, number] { 30 return [1, 2, 3, callId]; 31 } 32 33 getIdString(callId: number): string { 34 return '1.2.3.' + callId; 35 } 36} 37 38describe('Call', () => { 39 let call: Call; 40 41 beforeEach(() => { 42 const noop = () => { 43 // Do nothing. 44 }; 45 const pendingCalls = new PendingCalls(); 46 const rpc = new FakeRpc(); 47 call = new Call(pendingCalls, rpc, noop, noop, noop, 4); 48 }); 49 50 function newMessage(magicNumber = 1): SomeMessage { 51 const message = new SomeMessage(); 52 message.setMagicNumber(magicNumber); 53 return message; 54 } 55 56 it('getResponse returns all responses.', async () => { 57 const message1 = newMessage(1); 58 const message2 = newMessage(2); 59 const message3 = newMessage(3); 60 61 // Queue three responses 62 call.handleResponse(message1); 63 call.handleResponse(message2); 64 call.handleResponse(message3); 65 66 let responses = call.getResponses(2); 67 expect((await responses.next()).value).toEqual(message1); 68 expect((await responses.next()).value).toEqual(message2); 69 expect((await responses.next()).done).toEqual(true); 70 71 responses = call.getResponses(1); 72 expect((await responses.next()).value).toEqual(message3); 73 expect((await responses.next()).done).toEqual(true); 74 }); 75 76 it('getResponse early returns on stream end.', async () => { 77 const message = newMessage(); 78 const responses = call.getResponses(2); 79 80 // Queue one response and an early completion. 81 call.handleResponse(message); 82 call.handleCompletion(0); 83 84 expect((await responses.next()).value).toEqual(message); 85 expect((await responses.next()).done).toEqual(true); 86 }); 87 88 it('getResponse promise is rejected on stream error.', async () => { 89 expect.assertions(2); 90 const message = newMessage(); 91 const responses = call.getResponses(3); 92 93 call.handleResponse(message); 94 expect((await responses.next()).value).toEqual(message); 95 96 call.handleResponse(message); 97 call.handleError(1); 98 99 // Promise is rejected as soon as an error is received, even if there is a 100 // response in the queue. 101 responses.next().catch((e: Error) => { 102 expect(e.name).toEqual('TypeError'); 103 }); 104 }); 105 106 it('getResponse waits if queue is empty', async () => { 107 const message1 = newMessage(1); 108 const message2 = newMessage(2); 109 const responses = call.getResponses(2); 110 111 // Queue two responses after a small delay 112 setTimeout(() => { 113 call.handleResponse(message1); 114 call.handleResponse(message2); 115 call.handleCompletion(0); 116 expect(call.completed).toEqual(true); 117 }, 200); 118 119 expect(call.completed).toEqual(false); 120 expect((await responses.next()).value).toEqual(message1); 121 expect((await responses.next()).value).toEqual(message2); 122 expect((await responses.next()).done).toEqual(true); 123 }); 124 125 it('getResponse without count fetches all results', async () => { 126 const message1 = newMessage(1); 127 const message2 = newMessage(2); 128 const responses = call.getResponses(); 129 130 call.handleResponse(message1); 131 expect((await responses.next()).value).toEqual(message1); 132 133 setTimeout(() => { 134 call.handleResponse(message2); 135 call.handleCompletion(0); 136 expect(call.completed).toEqual(true); 137 }, 200); 138 139 expect(call.completed).toEqual(false); 140 expect((await responses.next()).value).toEqual(message2); 141 expect((await responses.next()).done).toEqual(true); 142 }); 143 144 it('getResponses limits to maximum number of responses', async () => { 145 const allResponses = []; 146 const noop = () => { 147 // Do nothing. 148 }; 149 const pendingCalls = new PendingCalls(); 150 const rpc = new FakeRpc(); 151 const streamCall = new ServerStreamingCall( 152 pendingCalls, 153 rpc, 154 (res) => allResponses.push(res), 155 noop, 156 noop, 157 4, 158 ); 159 160 const message1 = newMessage(1); 161 const message2 = newMessage(2); 162 const message3 = newMessage(3); 163 const message4 = newMessage(4); 164 const message5 = newMessage(5); 165 const message6 = newMessage(6); 166 167 setTimeout(() => { 168 streamCall.handleResponse(message1); 169 streamCall.handleResponse(message2); 170 streamCall.handleResponse(message3); 171 streamCall.handleResponse(message4); 172 streamCall.handleResponse(message5); 173 streamCall.handleResponse(message6); 174 streamCall.handleCompletion(Status.OK); 175 }, 200); 176 177 // All 5 responses are received, but only the most recent 4 are stored. 178 const [status, responses] = await streamCall.complete(); 179 expect(status).toEqual(Status.OK); 180 expect(allResponses).toEqual([ 181 message1, 182 message2, 183 message3, 184 message4, 185 message5, 186 message6, 187 ]); 188 expect(responses).toEqual([message3, message4, message5, message6]); 189 }); 190}); 191