1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/raw/client_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_bytes/array.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/writer.h"
22 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
23 #include "pw_status/status_with_size.h"
24 #include "pw_unit_test/framework.h"
25
26 namespace pw::rpc {
27 namespace {
28
29 using test::pw_rpc::raw::TestService;
30
FailIfCalled(Status)31 void FailIfCalled(Status) { FAIL(); }
FailIfOnNextCalled(ConstByteSpan)32 void FailIfOnNextCalled(ConstByteSpan) { FAIL(); }
FailIfOnCompletedCalled(ConstByteSpan,Status)33 void FailIfOnCompletedCalled(ConstByteSpan, Status) { FAIL(); }
34
TEST(RawUnaryReceiver,DefaultConstructed)35 TEST(RawUnaryReceiver, DefaultConstructed) {
36 RawUnaryReceiver call;
37
38 ASSERT_FALSE(call.active());
39 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
40
41 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
42
43 call.set_on_completed([](ConstByteSpan, Status) {});
44 call.set_on_error([](Status) {});
45 }
46
TEST(RawClientWriter,DefaultConstructed)47 TEST(RawClientWriter, DefaultConstructed) {
48 RawClientWriter call;
49
50 ASSERT_FALSE(call.active());
51 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
52
53 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
54 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
55 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
56
57 call.set_on_completed([](ConstByteSpan, Status) {});
58 call.set_on_error([](Status) {});
59 }
60
TEST(RawClientReader,DefaultConstructed)61 TEST(RawClientReader, DefaultConstructed) {
62 RawClientReader call;
63
64 ASSERT_FALSE(call.active());
65 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
66
67 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
68 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
69
70 call.set_on_completed([](Status) {});
71 call.set_on_next([](ConstByteSpan) {});
72 call.set_on_error([](Status) {});
73 }
74
TEST(RawClientReaderWriter,DefaultConstructed)75 TEST(RawClientReaderWriter, DefaultConstructed) {
76 RawClientReaderWriter call;
77
78 ASSERT_FALSE(call.active());
79 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
80
81 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
82 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
83 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
84
85 call.set_on_completed([](Status) {});
86 call.set_on_next([](ConstByteSpan) {});
87 call.set_on_error([](Status) {});
88 }
89
TEST(RawClientWriter,RequestCompletion)90 TEST(RawClientWriter, RequestCompletion) {
91 RawClientTestContext ctx;
92 RawClientWriter call = TestService::TestClientStreamRpc(
93 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
94 ASSERT_EQ(OkStatus(), call.RequestCompletion());
95
96 ASSERT_TRUE(call.active());
97 EXPECT_EQ(call.channel_id(), ctx.channel().id());
98
99 EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
100 EXPECT_EQ(OkStatus(), call.RequestCompletion());
101 EXPECT_EQ(OkStatus(), call.Cancel());
102
103 call.set_on_completed([](ConstByteSpan, Status) {});
104 call.set_on_error([](Status) {});
105 }
106
TEST(RawClientReader,RequestCompletion)107 TEST(RawClientReader, RequestCompletion) {
108 RawClientTestContext ctx;
109 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
110 ctx.channel().id(),
111 {},
112 FailIfOnNextCalled,
113 FailIfCalled,
114 FailIfCalled);
115 ASSERT_EQ(OkStatus(), call.RequestCompletion());
116
117 ASSERT_TRUE(call.active());
118 EXPECT_EQ(call.channel_id(), ctx.channel().id());
119
120 EXPECT_EQ(OkStatus(), call.RequestCompletion());
121 EXPECT_EQ(OkStatus(), call.Cancel());
122
123 call.set_on_completed([](Status) {});
124 call.set_on_next([](ConstByteSpan) {});
125 call.set_on_error([](Status) {});
126 }
127
TEST(RawClientReaderWriter,RequestCompletion)128 TEST(RawClientReaderWriter, RequestCompletion) {
129 RawClientTestContext ctx;
130 RawClientReaderWriter call =
131 TestService::TestBidirectionalStreamRpc(ctx.client(),
132 ctx.channel().id(),
133 FailIfOnNextCalled,
134 FailIfCalled,
135 FailIfCalled);
136 ASSERT_EQ(OkStatus(), call.RequestCompletion());
137
138 ASSERT_TRUE(call.active());
139 EXPECT_EQ(call.channel_id(), ctx.channel().id());
140
141 EXPECT_EQ(OkStatus(), call.Write(ConstByteSpan()));
142 EXPECT_EQ(OkStatus(), call.RequestCompletion());
143 EXPECT_EQ(OkStatus(), call.Cancel());
144
145 call.set_on_completed([](Status) {});
146 call.set_on_next([](ConstByteSpan) {});
147 call.set_on_error([](Status) {});
148 }
149
TEST(RawUnaryReceiver,Cancel)150 TEST(RawUnaryReceiver, Cancel) {
151 RawClientTestContext ctx;
152 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
153 ctx.channel().id(),
154 {},
155 FailIfOnCompletedCalled,
156 FailIfCalled);
157 ASSERT_EQ(OkStatus(), call.Cancel());
158
159 // Additional calls should do nothing and return FAILED_PRECONDITION.
160 ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
161 ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
162
163 ASSERT_FALSE(call.active());
164 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
165
166 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
167
168 call.set_on_completed([](ConstByteSpan, Status) {});
169 call.set_on_error([](Status) {});
170
171 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
172 }
173
TEST(RawClientWriter,Cancel)174 TEST(RawClientWriter, Cancel) {
175 RawClientTestContext ctx;
176 RawClientWriter call = TestService::TestClientStreamRpc(
177 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
178 ASSERT_EQ(OkStatus(), call.Cancel());
179
180 ASSERT_FALSE(call.active());
181 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
182
183 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
184 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
185 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
186
187 call.set_on_completed([](ConstByteSpan, Status) {});
188 call.set_on_error([](Status) {});
189
190 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
191 }
192
TEST(RawClientReader,Cancel)193 TEST(RawClientReader, Cancel) {
194 RawClientTestContext ctx;
195 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
196 ctx.channel().id(),
197 {},
198 FailIfOnNextCalled,
199 FailIfCalled,
200 FailIfCalled);
201 ASSERT_EQ(OkStatus(), call.Cancel());
202
203 ASSERT_FALSE(call.active());
204 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
205
206 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
207 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
208
209 call.set_on_completed([](Status) {});
210 call.set_on_next([](ConstByteSpan) {});
211 call.set_on_error([](Status) {});
212
213 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
214 }
215
TEST(RawClientReaderWriter,Cancel)216 TEST(RawClientReaderWriter, Cancel) {
217 RawClientTestContext ctx;
218 RawClientReaderWriter call =
219 TestService::TestBidirectionalStreamRpc(ctx.client(),
220 ctx.channel().id(),
221 FailIfOnNextCalled,
222 FailIfCalled,
223 FailIfCalled);
224 ASSERT_EQ(OkStatus(), call.Cancel());
225
226 ASSERT_FALSE(call.active());
227 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
228
229 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
230 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
231 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
232
233 call.set_on_completed([](Status) {});
234 call.set_on_next([](ConstByteSpan) {});
235 call.set_on_error([](Status) {});
236
237 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
238 }
239
TEST(RawUnaryReceiver,Abandon)240 TEST(RawUnaryReceiver, Abandon) {
241 RawClientTestContext ctx;
242 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
243 ctx.channel().id(),
244 {},
245 FailIfOnCompletedCalled,
246 FailIfCalled);
247 call.Abandon();
248
249 ASSERT_FALSE(call.active());
250 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
251
252 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
253
254 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
255 }
256
TEST(RawClientWriter,Abandon)257 TEST(RawClientWriter, Abandon) {
258 RawClientTestContext ctx;
259 RawClientWriter call = TestService::TestClientStreamRpc(
260 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
261 call.Abandon();
262
263 ASSERT_FALSE(call.active());
264 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
265
266 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
267 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
268 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
269
270 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
271 }
272
TEST(RawClientReader,Abandon)273 TEST(RawClientReader, Abandon) {
274 RawClientTestContext ctx;
275 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
276 ctx.channel().id(),
277 {},
278 FailIfOnNextCalled,
279 FailIfCalled,
280 FailIfCalled);
281 call.Abandon();
282
283 ASSERT_FALSE(call.active());
284 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
285
286 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
287 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
288
289 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
290 }
291
TEST(RawClientReaderWriter,Abandon)292 TEST(RawClientReaderWriter, Abandon) {
293 RawClientTestContext ctx;
294 RawClientReaderWriter call =
295 TestService::TestBidirectionalStreamRpc(ctx.client(),
296 ctx.channel().id(),
297 FailIfOnNextCalled,
298 FailIfCalled,
299 FailIfCalled);
300 call.Abandon();
301
302 ASSERT_FALSE(call.active());
303 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
304
305 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
306 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
307 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
308
309 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
310 }
311
TEST(RawUnaryReceiver,CloseAndWaitForCallbacks)312 TEST(RawUnaryReceiver, CloseAndWaitForCallbacks) {
313 RawClientTestContext ctx;
314 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
315 ctx.channel().id(),
316 {},
317 FailIfOnCompletedCalled,
318 FailIfCalled);
319 call.CloseAndWaitForCallbacks();
320
321 ASSERT_FALSE(call.active());
322 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
323
324 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
325
326 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
327 }
328
TEST(RawClientWriter,CloseAndWaitForCallbacks)329 TEST(RawClientWriter, CloseAndWaitForCallbacks) {
330 RawClientTestContext ctx;
331 RawClientWriter call = TestService::TestClientStreamRpc(
332 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
333 call.CloseAndWaitForCallbacks();
334
335 ASSERT_FALSE(call.active());
336 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
337
338 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
339 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
340 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
341
342 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
343 }
344
TEST(RawClientReader,CloseAndWaitForCallbacks)345 TEST(RawClientReader, CloseAndWaitForCallbacks) {
346 RawClientTestContext ctx;
347 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
348 ctx.channel().id(),
349 {},
350 FailIfOnNextCalled,
351 FailIfCalled,
352 FailIfCalled);
353 call.CloseAndWaitForCallbacks();
354
355 ASSERT_FALSE(call.active());
356 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
357
358 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
359 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
360
361 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
362 }
363
TEST(RawClientReaderWriter,CloseAndWaitForCallbacks)364 TEST(RawClientReaderWriter, CloseAndWaitForCallbacks) {
365 RawClientTestContext ctx;
366 RawClientReaderWriter call =
367 TestService::TestBidirectionalStreamRpc(ctx.client(),
368 ctx.channel().id(),
369 FailIfOnNextCalled,
370 FailIfCalled,
371 FailIfCalled);
372 call.CloseAndWaitForCallbacks();
373
374 ASSERT_FALSE(call.active());
375 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
376
377 EXPECT_EQ(Status::FailedPrecondition(), call.Write(ConstByteSpan()));
378 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
379 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
380
381 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
382 }
383
TEST(RawClientReaderWriter,WriteCallback)384 TEST(RawClientReaderWriter, WriteCallback) {
385 RawClientTestContext ctx;
386 RawClientReaderWriter call =
387 TestService::TestBidirectionalStreamRpc(ctx.client(),
388 ctx.channel().id(),
389 FailIfOnNextCalled,
390 FailIfCalled,
391 FailIfCalled);
392
393 constexpr auto kData = bytes::Initialized<16>(0x33);
394 EXPECT_EQ(OkStatus(), call.Write([&kData](ByteSpan buffer) {
395 std::memcpy(buffer.data(), kData.data(), kData.size());
396 return StatusWithSize(kData.size());
397 }));
398
399 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & written stream
400 EXPECT_EQ(std::memcmp(ctx.output()
401 .payloads<TestService::TestBidirectionalStreamRpc>()
402 .back()
403 .data(),
404 kData.data(),
405 kData.size()),
406 0);
407 }
408
TEST(RawClientReaderWriter,Move_InactiveToActive_EndsClientStream)409 TEST(RawClientReaderWriter, Move_InactiveToActive_EndsClientStream) {
410 RawClientTestContext ctx;
411
412 RawClientReaderWriter active_call =
413 TestService::TestBidirectionalStreamRpc(ctx.client(),
414 ctx.channel().id(),
415 FailIfOnNextCalled,
416 FailIfCalled,
417 FailIfCalled);
418
419 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
420
421 RawClientReaderWriter inactive_call;
422
423 active_call = std::move(inactive_call);
424
425 EXPECT_EQ(ctx.output().total_packets(),
426 2u); // Sent CLIENT_REQUEST_COMPLETION
427 EXPECT_EQ(
428 ctx.output()
429 .client_stream_end_packets<TestService::TestBidirectionalStreamRpc>(),
430 1u);
431
432 EXPECT_FALSE(active_call.active());
433 // NOLINTNEXTLINE(bugprone-use-after-move)
434 EXPECT_FALSE(inactive_call.active());
435 }
436
TEST(RawUnaryReceiver,Move_InactiveToActive_SilentlyCloses)437 TEST(RawUnaryReceiver, Move_InactiveToActive_SilentlyCloses) {
438 RawClientTestContext ctx;
439
440 RawUnaryReceiver active_call =
441 TestService::TestUnaryRpc(ctx.client(),
442 ctx.channel().id(),
443 {},
444 FailIfOnCompletedCalled,
445 FailIfCalled);
446
447 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
448
449 RawUnaryReceiver inactive_call;
450
451 active_call = std::move(inactive_call);
452
453 EXPECT_EQ(ctx.output().total_packets(), 1u); // No more packets
454
455 EXPECT_FALSE(active_call.active());
456 // NOLINTNEXTLINE(bugprone-use-after-move)
457 EXPECT_FALSE(inactive_call.active());
458 }
459
TEST(RawUnaryReceiver,Move_ActiveToActive)460 TEST(RawUnaryReceiver, Move_ActiveToActive) {
461 RawClientTestContext ctx;
462
463 RawUnaryReceiver active_call_1 =
464 TestService::TestUnaryRpc(ctx.client(), ctx.channel().id(), {});
465
466 RawUnaryReceiver active_call_2 =
467 TestService::TestAnotherUnaryRpc(ctx.client(), ctx.channel().id(), {});
468
469 ASSERT_EQ(ctx.output().total_packets(), 2u); // Sent the requests
470 ASSERT_TRUE(active_call_1.active());
471 ASSERT_TRUE(active_call_2.active());
472
473 active_call_2 = std::move(active_call_1);
474
475 EXPECT_EQ(ctx.output().total_packets(), 2u); // No more packets
476
477 // NOLINTNEXTLINE(bugprone-use-after-move)
478 EXPECT_FALSE(active_call_1.active());
479 EXPECT_TRUE(active_call_2.active());
480 }
481
TEST(RawUnaryReceiver,InvalidChannelId)482 TEST(RawUnaryReceiver, InvalidChannelId) {
483 RawClientTestContext ctx;
484 std::optional<Status> error;
485
486 RawUnaryReceiver call = TestService::TestUnaryRpc(
487 ctx.client(), 1290341, {}, {}, [&error](Status status) {
488 error = status;
489 });
490 EXPECT_FALSE(call.active());
491 EXPECT_EQ(error, Status::Unavailable());
492 }
493
TEST(RawClientReader,NoClientStream_OutOfScope_SilentlyCloses)494 TEST(RawClientReader, NoClientStream_OutOfScope_SilentlyCloses) {
495 RawClientTestContext ctx;
496
497 {
498 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
499 ctx.channel().id(),
500 {},
501 FailIfOnNextCalled,
502 FailIfCalled,
503 FailIfCalled);
504 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
505 }
506
507 EXPECT_EQ(ctx.output().total_packets(), 1u); // No more packets
508 }
509
TEST(RawClientWriter,WithClientStream_OutOfScope_SendsClientStreamEnd)510 TEST(RawClientWriter, WithClientStream_OutOfScope_SendsClientStreamEnd) {
511 RawClientTestContext ctx;
512
513 {
514 RawClientWriter call =
515 TestService::TestClientStreamRpc(ctx.client(),
516 ctx.channel().id(),
517 FailIfOnCompletedCalled,
518 FailIfCalled);
519 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
520 }
521
522 EXPECT_EQ(ctx.output().total_packets(),
523 2u); // Sent CLIENT_REQUEST_COMPLETION
524 EXPECT_EQ(ctx.output()
525 .client_stream_end_packets<TestService::TestClientStreamRpc>(),
526 1u);
527 }
528
529 constexpr const char kWriterData[] = "20X6";
530
WriteAsWriter(Writer & writer)531 void WriteAsWriter(Writer& writer) {
532 ASSERT_TRUE(writer.active());
533 ASSERT_EQ(writer.channel_id(), RawClientTestContext<>::kDefaultChannelId);
534
535 EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
536 }
537
TEST(RawClientWriter,UsableAsWriter)538 TEST(RawClientWriter, UsableAsWriter) {
539 RawClientTestContext ctx;
540 RawClientWriter call = TestService::TestClientStreamRpc(
541 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
542
543 WriteAsWriter(call.as_writer());
544
545 EXPECT_STREQ(reinterpret_cast<const char*>(
546 ctx.output()
547 .payloads<TestService::TestClientStreamRpc>()
548 .back()
549 .data()),
550 kWriterData);
551 }
552
TEST(RawClientReaderWriter,UsableAsWriter)553 TEST(RawClientReaderWriter, UsableAsWriter) {
554 RawClientTestContext ctx;
555 RawClientReaderWriter call =
556 TestService::TestBidirectionalStreamRpc(ctx.client(),
557 ctx.channel().id(),
558 FailIfOnNextCalled,
559 FailIfCalled,
560 FailIfCalled);
561
562 WriteAsWriter(call.as_writer());
563
564 EXPECT_STREQ(reinterpret_cast<const char*>(
565 ctx.output()
566 .payloads<TestService::TestBidirectionalStreamRpc>()
567 .back()
568 .data()),
569 kWriterData);
570 }
571
span_as_cstr(ConstByteSpan span)572 const char* span_as_cstr(ConstByteSpan span) {
573 return reinterpret_cast<const char*>(span.data());
574 }
575
TEST(RawClientReaderWriter,MultipleCallsToSameMethodOkAndReceiveSeparateResponses)576 TEST(RawClientReaderWriter,
577 MultipleCallsToSameMethodOkAndReceiveSeparateResponses) {
578 RawClientTestContext ctx;
579
580 ConstByteSpan data_1 = as_bytes(span("data_1_unset"));
581 ConstByteSpan data_2 = as_bytes(span("data_2_unset"));
582
583 Status error;
584 auto set_error = [&error](Status status) { error.Update(status); };
585 RawClientReaderWriter active_call_1 = TestService::TestBidirectionalStreamRpc(
586 ctx.client(),
587 ctx.channel().id(),
588 [&data_1](ConstByteSpan payload) { data_1 = payload; },
589 FailIfCalled,
590 set_error);
591
592 EXPECT_TRUE(active_call_1.active());
593
594 RawClientReaderWriter active_call_2 = TestService::TestBidirectionalStreamRpc(
595 ctx.client(),
596 ctx.channel().id(),
597 [&data_2](ConstByteSpan payload) { data_2 = payload; },
598 FailIfCalled,
599 set_error);
600
601 EXPECT_TRUE(active_call_1.active());
602 EXPECT_TRUE(active_call_2.active());
603 EXPECT_EQ(error, OkStatus());
604
605 ConstByteSpan message_1 = as_bytes(span("hello_1"));
606 ConstByteSpan message_2 = as_bytes(span("hello_2"));
607
608 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
609 message_2, active_call_2.id());
610 EXPECT_STREQ(span_as_cstr(data_2), span_as_cstr(message_2));
611 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
612 message_1, active_call_1.id());
613 EXPECT_STREQ(span_as_cstr(data_1), span_as_cstr(message_1));
614 }
615
616 } // namespace
617 } // namespace pw::rpc
618