xref: /aosp_15_r20/external/pigweed/pw_rpc/pwpb/server_reader_writer_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/pwpb/server_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "pw_rpc/pwpb/fake_channel_output.h"
20 #include "pw_rpc/pwpb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
23 #include "pw_unit_test/framework.h"
24 
25 namespace pw::rpc {
26 namespace {
27 
28 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
29 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
30 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
31 
32 class TestServiceImpl final
33     : public test::pw_rpc::pwpb::TestService::Service<TestServiceImpl> {
34  public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message &)35   Status TestUnaryRpc(const TestRequest::Message&, TestResponse::Message&) {
36     return OkStatus();
37   }
38 
TestAnotherUnaryRpc(const TestRequest::Message &,PwpbUnaryResponder<TestResponse::Message> &)39   void TestAnotherUnaryRpc(const TestRequest::Message&,
40                            PwpbUnaryResponder<TestResponse::Message>&) {}
41 
TestServerStreamRpc(const TestRequest::Message &,PwpbServerWriter<TestStreamResponse::Message> &)42   void TestServerStreamRpc(const TestRequest::Message&,
43                            PwpbServerWriter<TestStreamResponse::Message>&) {}
44 
TestClientStreamRpc(PwpbServerReader<TestRequest::Message,TestStreamResponse::Message> &)45   void TestClientStreamRpc(
46       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>&) {}
47 
TestBidirectionalStreamRpc(PwpbServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)48   void TestBidirectionalStreamRpc(
49       PwpbServerReaderWriter<TestRequest::Message,
50                              TestStreamResponse::Message>&) {}
51 };
52 
53 template <auto kMethod>
54 struct ReaderWriterTestContext {
55   using Info = internal::MethodInfo<kMethod>;
56 
57   static constexpr uint32_t kChannelId = 1;
58 
ReaderWriterTestContextpw::rpc::__anon08919c550111::ReaderWriterTestContext59   ReaderWriterTestContext()
60       : channel(Channel::Create<kChannelId>(&output)),
61         server(span(&channel, 1)) {}
62 
63   TestServiceImpl service;
64   PwpbFakeChannelOutput<4> output;
65   Channel channel;
66   Server server;
67 };
68 
69 using test::pw_rpc::pwpb::TestService;
70 
TEST(PwpbUnaryResponder,DefaultConstructed)71 TEST(PwpbUnaryResponder, DefaultConstructed) {
72   PwpbUnaryResponder<TestResponse::Message> call;
73 
74   ASSERT_FALSE(call.active());
75   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
76 
77   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
78 
79   call.set_on_error([](Status) {});
80 }
81 
TEST(PwpbServerWriter,DefaultConstructed)82 TEST(PwpbServerWriter, DefaultConstructed) {
83   PwpbServerWriter<TestStreamResponse::Message> call;
84 
85   ASSERT_FALSE(call.active());
86   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
87 
88   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
89   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
90 
91   call.set_on_error([](Status) {});
92 }
93 
TEST(PwpbServerReader,DefaultConstructed)94 TEST(PwpbServerReader, DefaultConstructed) {
95   PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call;
96 
97   ASSERT_FALSE(call.active());
98   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99 
100   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101 
102   call.set_on_next([](const TestRequest::Message&) {});
103   call.set_on_error([](Status) {});
104 }
105 
TEST(PwpbServerReaderWriter,DefaultConstructed)106 TEST(PwpbServerReaderWriter, DefaultConstructed) {
107   PwpbServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>
108       call;
109 
110   ASSERT_FALSE(call.active());
111   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112 
113   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115 
116   call.set_on_next([](const TestRequest::Message&) {});
117   call.set_on_error([](Status) {});
118 }
119 
TEST(PwpbUnaryResponder,Closed)120 TEST(PwpbUnaryResponder, Closed) {
121   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122   PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
123       TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
124   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
125 
126   ASSERT_FALSE(call.active());
127   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
128 
129   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
130 
131   call.set_on_error([](Status) {});
132 }
133 
TEST(PwpbUnaryResponder,TryClosedFailed)134 TEST(PwpbUnaryResponder, TryClosedFailed) {
135   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
136   PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
137       TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
138   // Sets ChannelOutput to always return false.
139   ctx.output.set_send_status(Status::Unknown());
140   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
141 
142   // Call should be still alive.
143   ASSERT_TRUE(call.active());
144 }
145 
TEST(PwpbUnaryResponder,TryCloseSuccessful)146 TEST(PwpbUnaryResponder, TryCloseSuccessful) {
147   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
148   PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
149       TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
150   // Sets ChannelOutput to always return false.
151   ctx.output.set_send_status(Status::Unknown());
152   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
153 
154   // Call should be still alive.
155   ASSERT_TRUE(call.active());
156 
157   // Tries to close the call again, with ChannelOutput set to return ok.
158   ctx.output.set_send_status(OkStatus());
159   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
160   // Call should be closed.
161   ASSERT_FALSE(call.active());
162 }
163 
TEST(PwpbServerWriter,Closed)164 TEST(PwpbServerWriter, Closed) {
165   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
166   PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
167       TestService::TestServerStreamRpc>(
168       ctx.server, ctx.channel.id(), ctx.service);
169   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
170 
171   ASSERT_FALSE(call.active());
172   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
173 
174   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
175   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
176 
177   call.set_on_error([](Status) {});
178 }
179 
TEST(PwpbServerWriter,TryClosedFailed)180 TEST(PwpbServerWriter, TryClosedFailed) {
181   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
182   PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
183       TestService::TestServerStreamRpc>(
184       ctx.server, ctx.channel.id(), ctx.service);
185   // Sets ChannelOutput to always return false.
186   ctx.output.set_send_status(Status::Unknown());
187   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
188 
189   // Call should be still alive.
190   ASSERT_TRUE(call.active());
191 }
192 
TEST(PwpbServerWriter,TryCloseSuccessful)193 TEST(PwpbServerWriter, TryCloseSuccessful) {
194   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
195   PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
196       TestService::TestServerStreamRpc>(
197       ctx.server, ctx.channel.id(), ctx.service);
198   // Sets ChannelOutput to always return false.
199   ctx.output.set_send_status(Status::Unknown());
200   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
201 
202   // Call should be still alive.
203   ASSERT_TRUE(call.active());
204 
205   // Tries to close the call again, with ChannelOutput set to return ok.
206   ctx.output.set_send_status(OkStatus());
207   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
208   // Call should be closed.
209   ASSERT_FALSE(call.active());
210 }
211 
TEST(PwpbServerReader,Closed)212 TEST(PwpbServerReader, Closed) {
213   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
214   PwpbServerReader call =
215       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
216           TestService::TestClientStreamRpc>(
217           ctx.server, ctx.channel.id(), ctx.service);
218   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
219 
220   ASSERT_FALSE(call.active());
221   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
222 
223   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
224 
225   call.set_on_next([](const TestRequest::Message&) {});
226   call.set_on_error([](Status) {});
227 }
228 
TEST(PwpbServerReader,TryClosedFailed)229 TEST(PwpbServerReader, TryClosedFailed) {
230   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
231   PwpbServerReader call =
232       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
233           TestService::TestClientStreamRpc>(
234           ctx.server, ctx.channel.id(), ctx.service);
235   // Sets ChannelOutput to always return false.
236   ctx.output.set_send_status(Status::Unknown());
237   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
238 
239   // Call should be still alive.
240   ASSERT_TRUE(call.active());
241 }
242 
TEST(PwpbServerReader,TryCloseSuccessful)243 TEST(PwpbServerReader, TryCloseSuccessful) {
244   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
245   PwpbServerReader call =
246       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
247           TestService::TestClientStreamRpc>(
248           ctx.server, ctx.channel.id(), ctx.service);
249   // Sets ChannelOutput to always return false.
250   ctx.output.set_send_status(Status::Unknown());
251   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
252 
253   // Call should be still alive.
254   ASSERT_TRUE(call.active());
255 
256   // Tries to close the call again, with ChannelOutput set to return ok.
257   ctx.output.set_send_status(OkStatus());
258   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
259   // Call should be closed.
260   ASSERT_FALSE(call.active());
261 }
262 
TEST(PwpbServerReaderWriter,Closed)263 TEST(PwpbServerReaderWriter, Closed) {
264   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
265   PwpbServerReaderWriter call =
266       PwpbServerReaderWriter<TestRequest::Message,
267                              TestStreamResponse::Message>::
268           Open<TestService::TestBidirectionalStreamRpc>(
269               ctx.server, ctx.channel.id(), ctx.service);
270   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
271 
272   ASSERT_FALSE(call.active());
273   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
274 
275   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
276   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
277 
278   call.set_on_next([](const TestRequest::Message&) {});
279   call.set_on_error([](Status) {});
280 }
281 
TEST(PwpbServerReaderWriter,TryClosedFailed)282 TEST(PwpbServerReaderWriter, TryClosedFailed) {
283   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
284   PwpbServerReaderWriter call =
285       PwpbServerReaderWriter<TestRequest::Message,
286                              TestStreamResponse::Message>::
287           Open<TestService::TestBidirectionalStreamRpc>(
288               ctx.server, ctx.channel.id(), ctx.service);
289   // Sets ChannelOutput to always return false.
290   ctx.output.set_send_status(Status::Unknown());
291   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
292 
293   // Call should be still alive.
294   ASSERT_TRUE(call.active());
295 }
296 
TEST(PwpbServerReaderWriter,TryCloseSuccessful)297 TEST(PwpbServerReaderWriter, TryCloseSuccessful) {
298   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
299   PwpbServerReaderWriter call =
300       PwpbServerReaderWriter<TestRequest::Message,
301                              TestStreamResponse::Message>::
302           Open<TestService::TestBidirectionalStreamRpc>(
303               ctx.server, ctx.channel.id(), ctx.service);
304   // Sets ChannelOutput to always return false.
305   ctx.output.set_send_status(Status::Unknown());
306   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
307 
308   // Call should be still alive.
309   ASSERT_TRUE(call.active());
310 
311   // Tries to close the call again, with ChannelOutput set to return ok.
312   ctx.output.set_send_status(OkStatus());
313   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
314   // Call should be closed.
315   ASSERT_FALSE(call.active());
316 }
317 
TEST(PwpbUnaryResponder,Open_ReturnsUsableResponder)318 TEST(PwpbUnaryResponder, Open_ReturnsUsableResponder) {
319   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
320   PwpbUnaryResponder responder =
321       PwpbUnaryResponder<TestResponse::Message>::Open<
322           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
323 
324   ASSERT_EQ(OkStatus(),
325             responder.Finish({.value = 4321, .repeated_field = {}}));
326 
327   EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
328   EXPECT_EQ(ctx.output.last_status(), OkStatus());
329 }
330 
TEST(PwpbServerWriter,Open_ReturnsUsableWriter)331 TEST(PwpbServerWriter, Open_ReturnsUsableWriter) {
332   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
333   PwpbServerWriter responder =
334       PwpbServerWriter<TestStreamResponse::Message>::Open<
335           TestService::TestServerStreamRpc>(
336           ctx.server, ctx.channel.id(), ctx.service);
337 
338   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
339   ASSERT_EQ(OkStatus(), responder.Finish());
340 
341   EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
342             321u);
343   EXPECT_EQ(ctx.output.last_status(), OkStatus());
344 }
345 
TEST(PwpbServerReader,Open_ReturnsUsableReader)346 TEST(PwpbServerReader, Open_ReturnsUsableReader) {
347   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
348   PwpbServerReader responder =
349       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
350           TestService::TestClientStreamRpc>(
351           ctx.server, ctx.channel.id(), ctx.service);
352 
353   ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
354 
355   EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
356             321u);
357 }
358 
TEST(PwpbServerReaderWriter,Open_ReturnsUsableReaderWriter)359 TEST(PwpbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
360   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
361   PwpbServerReaderWriter responder =
362       PwpbServerReaderWriter<TestRequest::Message,
363                              TestStreamResponse::Message>::
364           Open<TestService::TestBidirectionalStreamRpc>(
365               ctx.server, ctx.channel.id(), ctx.service);
366 
367   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
368   ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
369 
370   EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
371                 .number,
372             321u);
373   EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
374 }
375 
TEST(RawServerReaderWriter,Open_UnknownChannel)376 TEST(RawServerReaderWriter, Open_UnknownChannel) {
377   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
378   ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
379 
380   PwpbServerReaderWriter call =
381       PwpbServerReaderWriter<TestRequest::Message,
382                              TestStreamResponse::Message>::
383           Open<TestService::TestBidirectionalStreamRpc>(
384               ctx.server, ctx.kChannelId, ctx.service);
385 
386   EXPECT_TRUE(call.active());
387   EXPECT_EQ(call.channel_id(), ctx.kChannelId);
388   EXPECT_EQ(Status::Unavailable(), call.Write({}));
389 
390   ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
391 
392   EXPECT_EQ(OkStatus(), call.Write({}));
393   EXPECT_TRUE(call.active());
394 
395   EXPECT_EQ(OkStatus(), call.Finish());
396   EXPECT_FALSE(call.active());
397   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
398 }
399 
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)400 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
401   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
402 
403   PwpbServerReaderWriter one =
404       PwpbServerReaderWriter<TestRequest::Message,
405                              TestStreamResponse::Message>::
406           Open<TestService::TestBidirectionalStreamRpc>(
407               ctx.server, ctx.kChannelId, ctx.service);
408 
409   std::optional<Status> error;
410   one.set_on_error([&error](Status status) { error = status; });
411 
412   ASSERT_TRUE(one.active());
413 
414   PwpbServerReaderWriter two =
415       PwpbServerReaderWriter<TestRequest::Message,
416                              TestStreamResponse::Message>::
417           Open<TestService::TestBidirectionalStreamRpc>(
418               ctx.server, ctx.kChannelId, ctx.service);
419 
420   EXPECT_FALSE(one.active());
421   EXPECT_TRUE(two.active());
422 
423   EXPECT_EQ(Status::Cancelled(), error);
424 }
425 
TEST(PwpbServerReader,CallbacksMoveCorrectly)426 TEST(PwpbServerReader, CallbacksMoveCorrectly) {
427   PW_PWPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
428 
429   PwpbServerReader call_1 = ctx.reader();
430 
431   ASSERT_TRUE(call_1.active());
432 
433   TestRequest::Message received_request = {.integer = 12345678,
434                                            .status_code = 1};
435 
436   call_1.set_on_next([&received_request](const TestRequest::Message& value) {
437     received_request = value;
438   });
439 
440   PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call_2;
441   call_2 = std::move(call_1);
442 
443   constexpr TestRequest::Message request{.integer = 600613, .status_code = 2};
444   ctx.SendClientStream(request);
445   EXPECT_EQ(request.integer, received_request.integer);
446   EXPECT_EQ(request.status_code, received_request.status_code);
447 }
448 
449 }  // namespace
450 }  // namespace pw::rpc
451