xref: /aosp_15_r20/external/pigweed/pw_rpc/raw/client_reader_writer_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/raw/client_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "pw_bytes/array.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/writer.h"
22 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
23 #include "pw_status/status_with_size.h"
24 #include "pw_unit_test/framework.h"
25 
26 namespace pw::rpc {
27 namespace {
28 
29 using test::pw_rpc::raw::TestService;
30 
FailIfCalled(Status)31 void FailIfCalled(Status) { FAIL(); }
FailIfOnNextCalled(ConstByteSpan)32 void FailIfOnNextCalled(ConstByteSpan) { FAIL(); }
FailIfOnCompletedCalled(ConstByteSpan,Status)33 void FailIfOnCompletedCalled(ConstByteSpan, Status) { FAIL(); }
34 
TEST(RawUnaryReceiver,DefaultConstructed)35 TEST(RawUnaryReceiver, DefaultConstructed) {
36   RawUnaryReceiver call;
37 
38   ASSERT_FALSE(call.active());
39   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
40 
41   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
42 
43   call.set_on_completed([](ConstByteSpan, Status) {});
44   call.set_on_error([](Status) {});
45 }
46 
TEST(RawClientWriter,DefaultConstructed)47 TEST(RawClientWriter, DefaultConstructed) {
48   RawClientWriter call;
49 
50   ASSERT_FALSE(call.active());
51   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
52 
53   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
54   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
55   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
56 
57   call.set_on_completed([](ConstByteSpan, Status) {});
58   call.set_on_error([](Status) {});
59 }
60 
TEST(RawClientReader,DefaultConstructed)61 TEST(RawClientReader, DefaultConstructed) {
62   RawClientReader call;
63 
64   ASSERT_FALSE(call.active());
65   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
66 
67   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
68   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
69 
70   call.set_on_completed([](Status) {});
71   call.set_on_next([](ConstByteSpan) {});
72   call.set_on_error([](Status) {});
73 }
74 
TEST(RawClientReaderWriter,DefaultConstructed)75 TEST(RawClientReaderWriter, DefaultConstructed) {
76   RawClientReaderWriter call;
77 
78   ASSERT_FALSE(call.active());
79   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
80 
81   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
82   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
83   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
84 
85   call.set_on_completed([](Status) {});
86   call.set_on_next([](ConstByteSpan) {});
87   call.set_on_error([](Status) {});
88 }
89 
TEST(RawClientWriter,RequestCompletion)90 TEST(RawClientWriter, RequestCompletion) {
91   RawClientTestContext ctx;
92   RawClientWriter call = TestService::TestClientStreamRpc(
93       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
94   ASSERT_EQ(OkStatus(), call.RequestCompletion());
95 
96   ASSERT_TRUE(call.active());
97   EXPECT_EQ(call.channel_id(), ctx.channel().id());
98 
99   EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
100   EXPECT_EQ(OkStatus(), call.RequestCompletion());
101   EXPECT_EQ(OkStatus(), call.Cancel());
102 
103   call.set_on_completed([](ConstByteSpan, Status) {});
104   call.set_on_error([](Status) {});
105 }
106 
TEST(RawClientReader,RequestCompletion)107 TEST(RawClientReader, RequestCompletion) {
108   RawClientTestContext ctx;
109   RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
110                                                           ctx.channel().id(),
111                                                           {},
112                                                           FailIfOnNextCalled,
113                                                           FailIfCalled,
114                                                           FailIfCalled);
115   ASSERT_EQ(OkStatus(), call.RequestCompletion());
116 
117   ASSERT_TRUE(call.active());
118   EXPECT_EQ(call.channel_id(), ctx.channel().id());
119 
120   EXPECT_EQ(OkStatus(), call.RequestCompletion());
121   EXPECT_EQ(OkStatus(), call.Cancel());
122 
123   call.set_on_completed([](Status) {});
124   call.set_on_next([](ConstByteSpan) {});
125   call.set_on_error([](Status) {});
126 }
127 
TEST(RawClientReaderWriter,RequestCompletion)128 TEST(RawClientReaderWriter, RequestCompletion) {
129   RawClientTestContext ctx;
130   RawClientReaderWriter call =
131       TestService::TestBidirectionalStreamRpc(ctx.client(),
132                                               ctx.channel().id(),
133                                               FailIfOnNextCalled,
134                                               FailIfCalled,
135                                               FailIfCalled);
136   ASSERT_EQ(OkStatus(), call.RequestCompletion());
137 
138   ASSERT_TRUE(call.active());
139   EXPECT_EQ(call.channel_id(), ctx.channel().id());
140 
141   EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
142   EXPECT_EQ(OkStatus(), call.RequestCompletion());
143   EXPECT_EQ(OkStatus(), call.Cancel());
144 
145   call.set_on_completed([](Status) {});
146   call.set_on_next([](ConstByteSpan) {});
147   call.set_on_error([](Status) {});
148 }
149 
TEST(RawUnaryReceiver,Cancel)150 TEST(RawUnaryReceiver, Cancel) {
151   RawClientTestContext ctx;
152   RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
153                                                     ctx.channel().id(),
154                                                     {},
155                                                     FailIfOnCompletedCalled,
156                                                     FailIfCalled);
157   ASSERT_EQ(OkStatus(), call.Cancel());
158 
159   // Additional calls should do nothing and return FAILED_PRECONDITION.
160   ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
161   ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
162 
163   ASSERT_FALSE(call.active());
164   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
165 
166   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
167 
168   call.set_on_completed([](ConstByteSpan, Status) {});
169   call.set_on_error([](Status) {});
170 
171   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
172 }
173 
TEST(RawClientWriter,Cancel)174 TEST(RawClientWriter, Cancel) {
175   RawClientTestContext ctx;
176   RawClientWriter call = TestService::TestClientStreamRpc(
177       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
178   ASSERT_EQ(OkStatus(), call.Cancel());
179 
180   ASSERT_FALSE(call.active());
181   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
182 
183   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
184   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
185   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
186 
187   call.set_on_completed([](ConstByteSpan, Status) {});
188   call.set_on_error([](Status) {});
189 
190   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
191 }
192 
TEST(RawClientReader,Cancel)193 TEST(RawClientReader, Cancel) {
194   RawClientTestContext ctx;
195   RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
196                                                           ctx.channel().id(),
197                                                           {},
198                                                           FailIfOnNextCalled,
199                                                           FailIfCalled,
200                                                           FailIfCalled);
201   ASSERT_EQ(OkStatus(), call.Cancel());
202 
203   ASSERT_FALSE(call.active());
204   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
205 
206   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
207   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
208 
209   call.set_on_completed([](Status) {});
210   call.set_on_next([](ConstByteSpan) {});
211   call.set_on_error([](Status) {});
212 
213   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
214 }
215 
TEST(RawClientReaderWriter,Cancel)216 TEST(RawClientReaderWriter, Cancel) {
217   RawClientTestContext ctx;
218   RawClientReaderWriter call =
219       TestService::TestBidirectionalStreamRpc(ctx.client(),
220                                               ctx.channel().id(),
221                                               FailIfOnNextCalled,
222                                               FailIfCalled,
223                                               FailIfCalled);
224   ASSERT_EQ(OkStatus(), call.Cancel());
225 
226   ASSERT_FALSE(call.active());
227   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
228 
229   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
230   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
231   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
232 
233   call.set_on_completed([](Status) {});
234   call.set_on_next([](ConstByteSpan) {});
235   call.set_on_error([](Status) {});
236 
237   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & cancellation only
238 }
239 
TEST(RawUnaryReceiver,Abandon)240 TEST(RawUnaryReceiver, Abandon) {
241   RawClientTestContext ctx;
242   RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
243                                                     ctx.channel().id(),
244                                                     {},
245                                                     FailIfOnCompletedCalled,
246                                                     FailIfCalled);
247   call.Abandon();
248 
249   ASSERT_FALSE(call.active());
250   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
251 
252   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
253 
254   EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
255 }
256 
TEST(RawClientWriter,Abandon)257 TEST(RawClientWriter, Abandon) {
258   RawClientTestContext ctx;
259   RawClientWriter call = TestService::TestClientStreamRpc(
260       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
261   call.Abandon();
262 
263   ASSERT_FALSE(call.active());
264   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
265 
266   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
267   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
268   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
269 
270   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
271 }
272 
TEST(RawClientReader,Abandon)273 TEST(RawClientReader, Abandon) {
274   RawClientTestContext ctx;
275   RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
276                                                           ctx.channel().id(),
277                                                           {},
278                                                           FailIfOnNextCalled,
279                                                           FailIfCalled,
280                                                           FailIfCalled);
281   call.Abandon();
282 
283   ASSERT_FALSE(call.active());
284   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
285 
286   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
287   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
288 
289   EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
290 }
291 
TEST(RawClientReaderWriter,Abandon)292 TEST(RawClientReaderWriter, Abandon) {
293   RawClientTestContext ctx;
294   RawClientReaderWriter call =
295       TestService::TestBidirectionalStreamRpc(ctx.client(),
296                                               ctx.channel().id(),
297                                               FailIfOnNextCalled,
298                                               FailIfCalled,
299                                               FailIfCalled);
300   call.Abandon();
301 
302   ASSERT_FALSE(call.active());
303   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
304 
305   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
306   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
307   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
308 
309   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
310 }
311 
TEST(RawUnaryReceiver,CloseAndWaitForCallbacks)312 TEST(RawUnaryReceiver, CloseAndWaitForCallbacks) {
313   RawClientTestContext ctx;
314   RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
315                                                     ctx.channel().id(),
316                                                     {},
317                                                     FailIfOnCompletedCalled,
318                                                     FailIfCalled);
319   call.CloseAndWaitForCallbacks();
320 
321   ASSERT_FALSE(call.active());
322   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
323 
324   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
325 
326   EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
327 }
328 
TEST(RawClientWriter,CloseAndWaitForCallbacks)329 TEST(RawClientWriter, CloseAndWaitForCallbacks) {
330   RawClientTestContext ctx;
331   RawClientWriter call = TestService::TestClientStreamRpc(
332       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
333   call.CloseAndWaitForCallbacks();
334 
335   ASSERT_FALSE(call.active());
336   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
337 
338   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
339   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
340   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
341 
342   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
343 }
344 
TEST(RawClientReader,CloseAndWaitForCallbacks)345 TEST(RawClientReader, CloseAndWaitForCallbacks) {
346   RawClientTestContext ctx;
347   RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
348                                                           ctx.channel().id(),
349                                                           {},
350                                                           FailIfOnNextCalled,
351                                                           FailIfCalled,
352                                                           FailIfCalled);
353   call.CloseAndWaitForCallbacks();
354 
355   ASSERT_FALSE(call.active());
356   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
357 
358   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
359   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
360 
361   EXPECT_EQ(ctx.output().total_packets(), 1u);  // request only
362 }
363 
TEST(RawClientReaderWriter,CloseAndWaitForCallbacks)364 TEST(RawClientReaderWriter, CloseAndWaitForCallbacks) {
365   RawClientTestContext ctx;
366   RawClientReaderWriter call =
367       TestService::TestBidirectionalStreamRpc(ctx.client(),
368                                               ctx.channel().id(),
369                                               FailIfOnNextCalled,
370                                               FailIfCalled,
371                                               FailIfCalled);
372   call.CloseAndWaitForCallbacks();
373 
374   ASSERT_FALSE(call.active());
375   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
376 
377   EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
378   EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
379   EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
380 
381   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & client stream end
382 }
383 
TEST(RawClientReaderWriter,WriteCallback)384 TEST(RawClientReaderWriter, WriteCallback) {
385   RawClientTestContext ctx;
386   RawClientReaderWriter call =
387       TestService::TestBidirectionalStreamRpc(ctx.client(),
388                                               ctx.channel().id(),
389                                               FailIfOnNextCalled,
390                                               FailIfCalled,
391                                               FailIfCalled);
392 
393   constexpr auto kData = bytes::Initialized<16>(0x33);
394   EXPECT_EQ(OkStatus(), call.Write([&kData](ByteSpan buffer) {
395     std::memcpy(buffer.data(), kData.data(), kData.size());
396     return StatusWithSize(kData.size());
397   }));
398 
399   EXPECT_EQ(ctx.output().total_packets(), 2u);  // request & written stream
400   EXPECT_EQ(std::memcmp(ctx.output()
401                             .payloads<TestService::TestBidirectionalStreamRpc>()
402                             .back()
403                             .data(),
404                         kData.data(),
405                         kData.size()),
406             0);
407 }
408 
TEST(RawClientReaderWriter,Move_InactiveToActive_EndsClientStream)409 TEST(RawClientReaderWriter, Move_InactiveToActive_EndsClientStream) {
410   RawClientTestContext ctx;
411 
412   RawClientReaderWriter active_call =
413       TestService::TestBidirectionalStreamRpc(ctx.client(),
414                                               ctx.channel().id(),
415                                               FailIfOnNextCalled,
416                                               FailIfCalled,
417                                               FailIfCalled);
418 
419   ASSERT_EQ(ctx.output().total_packets(), 1u);  // Sent the request
420 
421   RawClientReaderWriter inactive_call;
422 
423   active_call = std::move(inactive_call);
424 
425   EXPECT_EQ(ctx.output().total_packets(),
426             2u);  // Sent CLIENT_REQUEST_COMPLETION
427   EXPECT_EQ(
428       ctx.output()
429           .client_stream_end_packets<TestService::TestBidirectionalStreamRpc>(),
430       1u);
431 
432   EXPECT_FALSE(active_call.active());
433   // NOLINTNEXTLINE(bugprone-use-after-move)
434   EXPECT_FALSE(inactive_call.active());
435 }
436 
TEST(RawUnaryReceiver,Move_InactiveToActive_SilentlyCloses)437 TEST(RawUnaryReceiver, Move_InactiveToActive_SilentlyCloses) {
438   RawClientTestContext ctx;
439 
440   RawUnaryReceiver active_call =
441       TestService::TestUnaryRpc(ctx.client(),
442                                 ctx.channel().id(),
443                                 {},
444                                 FailIfOnCompletedCalled,
445                                 FailIfCalled);
446 
447   ASSERT_EQ(ctx.output().total_packets(), 1u);  // Sent the request
448 
449   RawUnaryReceiver inactive_call;
450 
451   active_call = std::move(inactive_call);
452 
453   EXPECT_EQ(ctx.output().total_packets(), 1u);  // No more packets
454 
455   EXPECT_FALSE(active_call.active());
456   // NOLINTNEXTLINE(bugprone-use-after-move)
457   EXPECT_FALSE(inactive_call.active());
458 }
459 
TEST(RawUnaryReceiver,Move_ActiveToActive)460 TEST(RawUnaryReceiver, Move_ActiveToActive) {
461   RawClientTestContext ctx;
462 
463   RawUnaryReceiver active_call_1 =
464       TestService::TestUnaryRpc(ctx.client(), ctx.channel().id(), {});
465 
466   RawUnaryReceiver active_call_2 =
467       TestService::TestAnotherUnaryRpc(ctx.client(), ctx.channel().id(), {});
468 
469   ASSERT_EQ(ctx.output().total_packets(), 2u);  // Sent the requests
470   ASSERT_TRUE(active_call_1.active());
471   ASSERT_TRUE(active_call_2.active());
472 
473   active_call_2 = std::move(active_call_1);
474 
475   EXPECT_EQ(ctx.output().total_packets(), 2u);  // No more packets
476 
477   // NOLINTNEXTLINE(bugprone-use-after-move)
478   EXPECT_FALSE(active_call_1.active());
479   EXPECT_TRUE(active_call_2.active());
480 }
481 
TEST(RawUnaryReceiver,InvalidChannelId)482 TEST(RawUnaryReceiver, InvalidChannelId) {
483   RawClientTestContext ctx;
484   std::optional<Status> error;
485 
486   RawUnaryReceiver call = TestService::TestUnaryRpc(
487       ctx.client(), 1290341, {}, {}, [&error](Status status) {
488         error = status;
489       });
490   EXPECT_FALSE(call.active());
491   EXPECT_EQ(error, Status::Unavailable());
492 }
493 
TEST(RawClientReader,NoClientStream_OutOfScope_SilentlyCloses)494 TEST(RawClientReader, NoClientStream_OutOfScope_SilentlyCloses) {
495   RawClientTestContext ctx;
496 
497   {
498     RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
499                                                             ctx.channel().id(),
500                                                             {},
501                                                             FailIfOnNextCalled,
502                                                             FailIfCalled,
503                                                             FailIfCalled);
504     ASSERT_EQ(ctx.output().total_packets(), 1u);  // Sent the request
505   }
506 
507   EXPECT_EQ(ctx.output().total_packets(), 1u);  // No more packets
508 }
509 
TEST(RawClientWriter,WithClientStream_OutOfScope_SendsClientStreamEnd)510 TEST(RawClientWriter, WithClientStream_OutOfScope_SendsClientStreamEnd) {
511   RawClientTestContext ctx;
512 
513   {
514     RawClientWriter call =
515         TestService::TestClientStreamRpc(ctx.client(),
516                                          ctx.channel().id(),
517                                          FailIfOnCompletedCalled,
518                                          FailIfCalled);
519     ASSERT_EQ(ctx.output().total_packets(), 1u);  // Sent the request
520   }
521 
522   EXPECT_EQ(ctx.output().total_packets(),
523             2u);  // Sent CLIENT_REQUEST_COMPLETION
524   EXPECT_EQ(ctx.output()
525                 .client_stream_end_packets<TestService::TestClientStreamRpc>(),
526             1u);
527 }
528 
529 constexpr const char kWriterData[] = "20X6";
530 
WriteAsWriter(Writer & writer)531 void WriteAsWriter(Writer& writer) {
532   ASSERT_TRUE(writer.active());
533   ASSERT_EQ(writer.channel_id(), RawClientTestContext<>::kDefaultChannelId);
534 
535   EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
536 }
537 
TEST(RawClientWriter,UsableAsWriter)538 TEST(RawClientWriter, UsableAsWriter) {
539   RawClientTestContext ctx;
540   RawClientWriter call = TestService::TestClientStreamRpc(
541       ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
542 
543   WriteAsWriter(call.as_writer());
544 
545   EXPECT_STREQ(reinterpret_cast<const char*>(
546                    ctx.output()
547                        .payloads<TestService::TestClientStreamRpc>()
548                        .back()
549                        .data()),
550                kWriterData);
551 }
552 
TEST(RawClientReaderWriter,UsableAsWriter)553 TEST(RawClientReaderWriter, UsableAsWriter) {
554   RawClientTestContext ctx;
555   RawClientReaderWriter call =
556       TestService::TestBidirectionalStreamRpc(ctx.client(),
557                                               ctx.channel().id(),
558                                               FailIfOnNextCalled,
559                                               FailIfCalled,
560                                               FailIfCalled);
561 
562   WriteAsWriter(call.as_writer());
563 
564   EXPECT_STREQ(reinterpret_cast<const char*>(
565                    ctx.output()
566                        .payloads<TestService::TestBidirectionalStreamRpc>()
567                        .back()
568                        .data()),
569                kWriterData);
570 }
571 
span_as_cstr(ConstByteSpan span)572 const char* span_as_cstr(ConstByteSpan span) {
573   return reinterpret_cast<const char*>(span.data());
574 }
575 
TEST(RawClientReaderWriter,MultipleCallsToSameMethodOkAndReceiveSeparateResponses)576 TEST(RawClientReaderWriter,
577      MultipleCallsToSameMethodOkAndReceiveSeparateResponses) {
578   RawClientTestContext ctx;
579 
580   ConstByteSpan data_1 = as_bytes(span("data_1_unset"));
581   ConstByteSpan data_2 = as_bytes(span("data_2_unset"));
582 
583   Status error;
584   auto set_error = [&error](Status status) { error.Update(status); };
585   RawClientReaderWriter active_call_1 = TestService::TestBidirectionalStreamRpc(
586       ctx.client(),
587       ctx.channel().id(),
588       [&data_1](ConstByteSpan payload) { data_1 = payload; },
589       FailIfCalled,
590       set_error);
591 
592   EXPECT_TRUE(active_call_1.active());
593 
594   RawClientReaderWriter active_call_2 = TestService::TestBidirectionalStreamRpc(
595       ctx.client(),
596       ctx.channel().id(),
597       [&data_2](ConstByteSpan payload) { data_2 = payload; },
598       FailIfCalled,
599       set_error);
600 
601   EXPECT_TRUE(active_call_1.active());
602   EXPECT_TRUE(active_call_2.active());
603   EXPECT_EQ(error, OkStatus());
604 
605   ConstByteSpan message_1 = as_bytes(span("hello_1"));
606   ConstByteSpan message_2 = as_bytes(span("hello_2"));
607 
608   ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
609       message_2, active_call_2.id());
610   EXPECT_STREQ(span_as_cstr(data_2), span_as_cstr(message_2));
611   ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
612       message_1, active_call_1.id());
613   EXPECT_STREQ(span_as_cstr(data_1), span_as_cstr(message_1));
614 }
615 
616 }  // namespace
617 }  // namespace pw::rpc
618