1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/pwpb/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/pwpb/fake_channel_output.h"
20 #include "pw_rpc/pwpb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
23 #include "pw_unit_test/framework.h"
24
25 namespace pw::rpc {
26 namespace {
27
28 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
29 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
30 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
31
32 class TestServiceImpl final
33 : public test::pw_rpc::pwpb::TestService::Service<TestServiceImpl> {
34 public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message &)35 Status TestUnaryRpc(const TestRequest::Message&, TestResponse::Message&) {
36 return OkStatus();
37 }
38
TestAnotherUnaryRpc(const TestRequest::Message &,PwpbUnaryResponder<TestResponse::Message> &)39 void TestAnotherUnaryRpc(const TestRequest::Message&,
40 PwpbUnaryResponder<TestResponse::Message>&) {}
41
TestServerStreamRpc(const TestRequest::Message &,PwpbServerWriter<TestStreamResponse::Message> &)42 void TestServerStreamRpc(const TestRequest::Message&,
43 PwpbServerWriter<TestStreamResponse::Message>&) {}
44
TestClientStreamRpc(PwpbServerReader<TestRequest::Message,TestStreamResponse::Message> &)45 void TestClientStreamRpc(
46 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>&) {}
47
TestBidirectionalStreamRpc(PwpbServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)48 void TestBidirectionalStreamRpc(
49 PwpbServerReaderWriter<TestRequest::Message,
50 TestStreamResponse::Message>&) {}
51 };
52
53 template <auto kMethod>
54 struct ReaderWriterTestContext {
55 using Info = internal::MethodInfo<kMethod>;
56
57 static constexpr uint32_t kChannelId = 1;
58
ReaderWriterTestContextpw::rpc::__anon08919c550111::ReaderWriterTestContext59 ReaderWriterTestContext()
60 : channel(Channel::Create<kChannelId>(&output)),
61 server(span(&channel, 1)) {}
62
63 TestServiceImpl service;
64 PwpbFakeChannelOutput<4> output;
65 Channel channel;
66 Server server;
67 };
68
69 using test::pw_rpc::pwpb::TestService;
70
TEST(PwpbUnaryResponder,DefaultConstructed)71 TEST(PwpbUnaryResponder, DefaultConstructed) {
72 PwpbUnaryResponder<TestResponse::Message> call;
73
74 ASSERT_FALSE(call.active());
75 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
76
77 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
78
79 call.set_on_error([](Status) {});
80 }
81
TEST(PwpbServerWriter,DefaultConstructed)82 TEST(PwpbServerWriter, DefaultConstructed) {
83 PwpbServerWriter<TestStreamResponse::Message> call;
84
85 ASSERT_FALSE(call.active());
86 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
87
88 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
89 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
90
91 call.set_on_error([](Status) {});
92 }
93
TEST(PwpbServerReader,DefaultConstructed)94 TEST(PwpbServerReader, DefaultConstructed) {
95 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call;
96
97 ASSERT_FALSE(call.active());
98 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99
100 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101
102 call.set_on_next([](const TestRequest::Message&) {});
103 call.set_on_error([](Status) {});
104 }
105
TEST(PwpbServerReaderWriter,DefaultConstructed)106 TEST(PwpbServerReaderWriter, DefaultConstructed) {
107 PwpbServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>
108 call;
109
110 ASSERT_FALSE(call.active());
111 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112
113 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115
116 call.set_on_next([](const TestRequest::Message&) {});
117 call.set_on_error([](Status) {});
118 }
119
TEST(PwpbUnaryResponder,Closed)120 TEST(PwpbUnaryResponder, Closed) {
121 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122 PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
123 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
124 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
125
126 ASSERT_FALSE(call.active());
127 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
128
129 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
130
131 call.set_on_error([](Status) {});
132 }
133
TEST(PwpbUnaryResponder,TryClosedFailed)134 TEST(PwpbUnaryResponder, TryClosedFailed) {
135 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
136 PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
137 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
138 // Sets ChannelOutput to always return false.
139 ctx.output.set_send_status(Status::Unknown());
140 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
141
142 // Call should be still alive.
143 ASSERT_TRUE(call.active());
144 }
145
TEST(PwpbUnaryResponder,TryCloseSuccessful)146 TEST(PwpbUnaryResponder, TryCloseSuccessful) {
147 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
148 PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
149 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
150 // Sets ChannelOutput to always return false.
151 ctx.output.set_send_status(Status::Unknown());
152 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
153
154 // Call should be still alive.
155 ASSERT_TRUE(call.active());
156
157 // Tries to close the call again, with ChannelOutput set to return ok.
158 ctx.output.set_send_status(OkStatus());
159 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
160 // Call should be closed.
161 ASSERT_FALSE(call.active());
162 }
163
TEST(PwpbServerWriter,Closed)164 TEST(PwpbServerWriter, Closed) {
165 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
166 PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
167 TestService::TestServerStreamRpc>(
168 ctx.server, ctx.channel.id(), ctx.service);
169 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
170
171 ASSERT_FALSE(call.active());
172 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
173
174 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
175 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
176
177 call.set_on_error([](Status) {});
178 }
179
TEST(PwpbServerWriter,TryClosedFailed)180 TEST(PwpbServerWriter, TryClosedFailed) {
181 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
182 PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
183 TestService::TestServerStreamRpc>(
184 ctx.server, ctx.channel.id(), ctx.service);
185 // Sets ChannelOutput to always return false.
186 ctx.output.set_send_status(Status::Unknown());
187 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
188
189 // Call should be still alive.
190 ASSERT_TRUE(call.active());
191 }
192
TEST(PwpbServerWriter,TryCloseSuccessful)193 TEST(PwpbServerWriter, TryCloseSuccessful) {
194 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
195 PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
196 TestService::TestServerStreamRpc>(
197 ctx.server, ctx.channel.id(), ctx.service);
198 // Sets ChannelOutput to always return false.
199 ctx.output.set_send_status(Status::Unknown());
200 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
201
202 // Call should be still alive.
203 ASSERT_TRUE(call.active());
204
205 // Tries to close the call again, with ChannelOutput set to return ok.
206 ctx.output.set_send_status(OkStatus());
207 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
208 // Call should be closed.
209 ASSERT_FALSE(call.active());
210 }
211
TEST(PwpbServerReader,Closed)212 TEST(PwpbServerReader, Closed) {
213 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
214 PwpbServerReader call =
215 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
216 TestService::TestClientStreamRpc>(
217 ctx.server, ctx.channel.id(), ctx.service);
218 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
219
220 ASSERT_FALSE(call.active());
221 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
222
223 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
224
225 call.set_on_next([](const TestRequest::Message&) {});
226 call.set_on_error([](Status) {});
227 }
228
TEST(PwpbServerReader,TryClosedFailed)229 TEST(PwpbServerReader, TryClosedFailed) {
230 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
231 PwpbServerReader call =
232 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
233 TestService::TestClientStreamRpc>(
234 ctx.server, ctx.channel.id(), ctx.service);
235 // Sets ChannelOutput to always return false.
236 ctx.output.set_send_status(Status::Unknown());
237 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
238
239 // Call should be still alive.
240 ASSERT_TRUE(call.active());
241 }
242
TEST(PwpbServerReader,TryCloseSuccessful)243 TEST(PwpbServerReader, TryCloseSuccessful) {
244 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
245 PwpbServerReader call =
246 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
247 TestService::TestClientStreamRpc>(
248 ctx.server, ctx.channel.id(), ctx.service);
249 // Sets ChannelOutput to always return false.
250 ctx.output.set_send_status(Status::Unknown());
251 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
252
253 // Call should be still alive.
254 ASSERT_TRUE(call.active());
255
256 // Tries to close the call again, with ChannelOutput set to return ok.
257 ctx.output.set_send_status(OkStatus());
258 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
259 // Call should be closed.
260 ASSERT_FALSE(call.active());
261 }
262
TEST(PwpbServerReaderWriter,Closed)263 TEST(PwpbServerReaderWriter, Closed) {
264 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
265 PwpbServerReaderWriter call =
266 PwpbServerReaderWriter<TestRequest::Message,
267 TestStreamResponse::Message>::
268 Open<TestService::TestBidirectionalStreamRpc>(
269 ctx.server, ctx.channel.id(), ctx.service);
270 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
271
272 ASSERT_FALSE(call.active());
273 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
274
275 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
276 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
277
278 call.set_on_next([](const TestRequest::Message&) {});
279 call.set_on_error([](Status) {});
280 }
281
TEST(PwpbServerReaderWriter,TryClosedFailed)282 TEST(PwpbServerReaderWriter, TryClosedFailed) {
283 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
284 PwpbServerReaderWriter call =
285 PwpbServerReaderWriter<TestRequest::Message,
286 TestStreamResponse::Message>::
287 Open<TestService::TestBidirectionalStreamRpc>(
288 ctx.server, ctx.channel.id(), ctx.service);
289 // Sets ChannelOutput to always return false.
290 ctx.output.set_send_status(Status::Unknown());
291 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
292
293 // Call should be still alive.
294 ASSERT_TRUE(call.active());
295 }
296
TEST(PwpbServerReaderWriter,TryCloseSuccessful)297 TEST(PwpbServerReaderWriter, TryCloseSuccessful) {
298 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
299 PwpbServerReaderWriter call =
300 PwpbServerReaderWriter<TestRequest::Message,
301 TestStreamResponse::Message>::
302 Open<TestService::TestBidirectionalStreamRpc>(
303 ctx.server, ctx.channel.id(), ctx.service);
304 // Sets ChannelOutput to always return false.
305 ctx.output.set_send_status(Status::Unknown());
306 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
307
308 // Call should be still alive.
309 ASSERT_TRUE(call.active());
310
311 // Tries to close the call again, with ChannelOutput set to return ok.
312 ctx.output.set_send_status(OkStatus());
313 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
314 // Call should be closed.
315 ASSERT_FALSE(call.active());
316 }
317
TEST(PwpbUnaryResponder,Open_ReturnsUsableResponder)318 TEST(PwpbUnaryResponder, Open_ReturnsUsableResponder) {
319 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
320 PwpbUnaryResponder responder =
321 PwpbUnaryResponder<TestResponse::Message>::Open<
322 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
323
324 ASSERT_EQ(OkStatus(),
325 responder.Finish({.value = 4321, .repeated_field = {}}));
326
327 EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
328 EXPECT_EQ(ctx.output.last_status(), OkStatus());
329 }
330
TEST(PwpbServerWriter,Open_ReturnsUsableWriter)331 TEST(PwpbServerWriter, Open_ReturnsUsableWriter) {
332 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
333 PwpbServerWriter responder =
334 PwpbServerWriter<TestStreamResponse::Message>::Open<
335 TestService::TestServerStreamRpc>(
336 ctx.server, ctx.channel.id(), ctx.service);
337
338 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
339 ASSERT_EQ(OkStatus(), responder.Finish());
340
341 EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
342 321u);
343 EXPECT_EQ(ctx.output.last_status(), OkStatus());
344 }
345
TEST(PwpbServerReader,Open_ReturnsUsableReader)346 TEST(PwpbServerReader, Open_ReturnsUsableReader) {
347 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
348 PwpbServerReader responder =
349 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
350 TestService::TestClientStreamRpc>(
351 ctx.server, ctx.channel.id(), ctx.service);
352
353 ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
354
355 EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
356 321u);
357 }
358
TEST(PwpbServerReaderWriter,Open_ReturnsUsableReaderWriter)359 TEST(PwpbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
360 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
361 PwpbServerReaderWriter responder =
362 PwpbServerReaderWriter<TestRequest::Message,
363 TestStreamResponse::Message>::
364 Open<TestService::TestBidirectionalStreamRpc>(
365 ctx.server, ctx.channel.id(), ctx.service);
366
367 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
368 ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
369
370 EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
371 .number,
372 321u);
373 EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
374 }
375
TEST(RawServerReaderWriter,Open_UnknownChannel)376 TEST(RawServerReaderWriter, Open_UnknownChannel) {
377 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
378 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
379
380 PwpbServerReaderWriter call =
381 PwpbServerReaderWriter<TestRequest::Message,
382 TestStreamResponse::Message>::
383 Open<TestService::TestBidirectionalStreamRpc>(
384 ctx.server, ctx.kChannelId, ctx.service);
385
386 EXPECT_TRUE(call.active());
387 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
388 EXPECT_EQ(Status::Unavailable(), call.Write({}));
389
390 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
391
392 EXPECT_EQ(OkStatus(), call.Write({}));
393 EXPECT_TRUE(call.active());
394
395 EXPECT_EQ(OkStatus(), call.Finish());
396 EXPECT_FALSE(call.active());
397 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
398 }
399
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)400 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
401 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
402
403 PwpbServerReaderWriter one =
404 PwpbServerReaderWriter<TestRequest::Message,
405 TestStreamResponse::Message>::
406 Open<TestService::TestBidirectionalStreamRpc>(
407 ctx.server, ctx.kChannelId, ctx.service);
408
409 std::optional<Status> error;
410 one.set_on_error([&error](Status status) { error = status; });
411
412 ASSERT_TRUE(one.active());
413
414 PwpbServerReaderWriter two =
415 PwpbServerReaderWriter<TestRequest::Message,
416 TestStreamResponse::Message>::
417 Open<TestService::TestBidirectionalStreamRpc>(
418 ctx.server, ctx.kChannelId, ctx.service);
419
420 EXPECT_FALSE(one.active());
421 EXPECT_TRUE(two.active());
422
423 EXPECT_EQ(Status::Cancelled(), error);
424 }
425
TEST(PwpbServerReader,CallbacksMoveCorrectly)426 TEST(PwpbServerReader, CallbacksMoveCorrectly) {
427 PW_PWPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
428
429 PwpbServerReader call_1 = ctx.reader();
430
431 ASSERT_TRUE(call_1.active());
432
433 TestRequest::Message received_request = {.integer = 12345678,
434 .status_code = 1};
435
436 call_1.set_on_next([&received_request](const TestRequest::Message& value) {
437 received_request = value;
438 });
439
440 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call_2;
441 call_2 = std::move(call_1);
442
443 constexpr TestRequest::Message request{.integer = 600613, .status_code = 2};
444 ctx.SendClientStream(request);
445 EXPECT_EQ(request.integer, received_request.integer);
446 EXPECT_EQ(request.status_code, received_request.status_code);
447 }
448
449 } // namespace
450 } // namespace pw::rpc
451