xref: /aosp_15_r20/external/pigweed/pw_rpc/raw/codegen_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <optional>
16 
17 #include "pw_protobuf/decoder.h"
18 #include "pw_rpc/internal/hash.h"
19 #include "pw_rpc/raw/client_testing.h"
20 #include "pw_rpc/raw/test_method_context.h"
21 #include "pw_rpc_test_protos/no_package.raw_rpc.pb.h"
22 #include "pw_rpc_test_protos/test.pwpb.h"
23 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
24 #include "pw_unit_test/framework.h"
25 
26 namespace pw::rpc {
27 namespace {
28 
29 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
30 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
31 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
32 
EncodeRequest(int integer,Status status)33 Vector<std::byte, 64> EncodeRequest(int integer, Status status) {
34   Vector<std::byte, 64> buffer(64);
35   TestRequest::MemoryEncoder test_request(buffer);
36 
37   EXPECT_EQ(OkStatus(), test_request.WriteInteger(integer));
38   EXPECT_EQ(OkStatus(), test_request.WriteStatusCode(status.code()));
39 
40   EXPECT_EQ(OkStatus(), test_request.status());
41   buffer.resize(test_request.size());
42   return buffer;
43 }
44 
EncodeResponse(int number)45 Vector<std::byte, 64> EncodeResponse(int number) {
46   Vector<std::byte, 64> buffer(64);
47   TestStreamResponse::MemoryEncoder test_response(buffer);
48 
49   EXPECT_EQ(OkStatus(), test_response.WriteNumber(number));
50 
51   EXPECT_EQ(OkStatus(), test_response.status());
52   buffer.resize(test_response.size());
53   return buffer;
54 }
55 
56 }  // namespace
57 
58 namespace test {
59 
60 class TestService final
61     : public pw_rpc::raw::TestService::Service<TestService> {
62  public:
TestUnaryRpc(ConstByteSpan request,RawUnaryResponder & responder)63   static void TestUnaryRpc(ConstByteSpan request,
64                            RawUnaryResponder& responder) {
65     int64_t integer;
66     Status status;
67 
68     if (!DecodeRequest(request, integer, status)) {
69       ASSERT_EQ(OkStatus(), responder.Finish({}, Status::DataLoss()));
70       return;
71     }
72 
73     std::byte response[64] = {};
74     TestResponse::MemoryEncoder test_response(response);
75     EXPECT_EQ(OkStatus(), test_response.WriteValue(integer + 1));
76 
77     ASSERT_EQ(
78         OkStatus(),
79         responder.Finish(span(response).first(test_response.size()), status));
80   }
81 
TestAnotherUnaryRpc(ConstByteSpan request,RawUnaryResponder & responder)82   void TestAnotherUnaryRpc(ConstByteSpan request,
83                            RawUnaryResponder& responder) {
84     if (request.empty()) {
85       last_responder_ = std::move(responder);
86     } else {
87       TestUnaryRpc(request, responder);
88     }
89   }
90 
TestServerStreamRpc(ConstByteSpan request,RawServerWriter & writer)91   void TestServerStreamRpc(ConstByteSpan request, RawServerWriter& writer) {
92     int64_t integer;
93     Status status;
94 
95     ASSERT_TRUE(DecodeRequest(request, integer, status));
96     for (int i = 0; i < integer; ++i) {
97       std::byte buffer[32] = {};
98       TestStreamResponse::MemoryEncoder test_stream_response(buffer);
99       EXPECT_EQ(OkStatus(), test_stream_response.WriteNumber(i));
100       EXPECT_EQ(OkStatus(), writer.Write(test_stream_response));
101     }
102 
103     EXPECT_EQ(OkStatus(), writer.Finish(status));
104   }
105 
TestClientStreamRpc(RawServerReader & reader)106   void TestClientStreamRpc(RawServerReader& reader) {
107     last_reader_ = std::move(reader);
108 
109     last_reader_.set_on_next([this](ConstByteSpan payload) {
110       EXPECT_EQ(OkStatus(),
111                 last_reader_.Finish(EncodeResponse(ReadInteger(payload)),
112                                     Status::Unauthenticated()));
113     });
114   }
115 
TestBidirectionalStreamRpc(RawServerReaderWriter & reader_writer)116   void TestBidirectionalStreamRpc(RawServerReaderWriter& reader_writer) {
117     last_reader_writer_ = std::move(reader_writer);
118 
119     last_reader_writer_.set_on_next([this](ConstByteSpan payload) {
120       EXPECT_EQ(
121           OkStatus(),
122           last_reader_writer_.Write(EncodeResponse(ReadInteger(payload))));
123       EXPECT_EQ(OkStatus(), last_reader_writer_.Finish(Status::NotFound()));
124     });
125   }
126 
last_responder()127   RawUnaryResponder& last_responder() { return last_responder_; }
128 
129  private:
ReadInteger(ConstByteSpan request)130   static uint32_t ReadInteger(ConstByteSpan request) {
131     uint32_t integer = 0;
132 
133     protobuf::Decoder decoder(request);
134     while (decoder.Next().ok()) {
135       switch (static_cast<TestRequest::Fields>(decoder.FieldNumber())) {
136         case TestRequest::Fields::kInteger:
137           EXPECT_EQ(OkStatus(), decoder.ReadUint32(&integer));
138           break;
139         case TestRequest::Fields::kStatusCode:
140           break;
141         default:
142           ADD_FAILURE();
143       }
144     }
145 
146     return integer;
147   }
148 
DecodeRequest(ConstByteSpan request,int64_t & integer,Status & status)149   static bool DecodeRequest(ConstByteSpan request,
150                             int64_t& integer,
151                             Status& status) {
152     protobuf::Decoder decoder(request);
153     Status decode_status;
154     bool has_integer = false;
155     bool has_status = false;
156 
157     while (decoder.Next().ok()) {
158       switch (static_cast<TestRequest::Fields>(decoder.FieldNumber())) {
159         case TestRequest::Fields::kInteger:
160           decode_status = decoder.ReadInt64(&integer);
161           EXPECT_EQ(OkStatus(), decode_status);
162           has_integer = decode_status.ok();
163           break;
164         case TestRequest::Fields::kStatusCode: {
165           uint32_t status_code;
166           decode_status = decoder.ReadUint32(&status_code);
167           EXPECT_EQ(OkStatus(), decode_status);
168           has_status = decode_status.ok();
169           status = static_cast<Status::Code>(status_code);
170           break;
171         }
172       }
173     }
174     EXPECT_TRUE(has_integer);
175     EXPECT_TRUE(has_status);
176     return has_integer && has_status;
177   }
178 
179   RawUnaryResponder last_responder_;
180   RawServerReader last_reader_;
181   RawServerReaderWriter last_reader_writer_;
182 };
183 
184 // Test that code generation succeeds when no proto package is specified.
185 class NoPackageTestService final
186     : public ::pw_rpc::raw::PwRpcTestService::Service<NoPackageTestService> {
187  public:
TestUnaryRpc(ConstByteSpan,RawUnaryResponder &)188   static void TestUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
189 
TestAnotherUnaryRpc(ConstByteSpan,RawUnaryResponder &)190   void TestAnotherUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
191 
TestServerStreamRpc(ConstByteSpan,RawServerWriter &)192   void TestServerStreamRpc(ConstByteSpan, RawServerWriter&) {}
193 
TestClientStreamRpc(RawServerReader &)194   void TestClientStreamRpc(RawServerReader&) {}
195 
TestBidirectionalStreamRpc(RawServerReaderWriter &)196   void TestBidirectionalStreamRpc(RawServerReaderWriter&) {}
197 };
198 
199 }  // namespace test
200 
201 namespace {
202 
TEST(RawCodegen,Server_CompilesProperly)203 TEST(RawCodegen, Server_CompilesProperly) {
204   test::TestService service;
205   EXPECT_EQ(internal::UnwrapServiceId(service.service_id()),
206             internal::Hash("pw.rpc.test.TestService"));
207   EXPECT_STREQ(service.name(), "TestService");
208 }
209 
TEST(RawCodegen,Server_InvokeUnaryRpc)210 TEST(RawCodegen, Server_InvokeUnaryRpc) {
211   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestUnaryRpc) context;
212 
213   context.call(EncodeRequest(123, OkStatus()));
214   EXPECT_EQ(OkStatus(), context.status());
215 
216   protobuf::Decoder decoder(context.response());
217 
218   while (decoder.Next().ok()) {
219     switch (static_cast<TestResponse::Fields>(decoder.FieldNumber())) {
220       case TestResponse::Fields::kValue: {
221         int32_t value;
222         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
223         EXPECT_EQ(value, 124);
224         break;
225       }
226       case TestResponse::Fields::kRepeatedField:
227         break;  // Ignore this field
228     }
229   }
230 }
231 
TEST(RawCodegen,Server_InvokeAsyncUnaryRpc)232 TEST(RawCodegen, Server_InvokeAsyncUnaryRpc) {
233   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) context;
234 
235   context.call(EncodeRequest(123, OkStatus()));
236   EXPECT_EQ(OkStatus(), context.status());
237 
238   protobuf::Decoder decoder(context.response());
239 
240   while (decoder.Next().ok()) {
241     switch (static_cast<TestResponse::Fields>(decoder.FieldNumber())) {
242       case TestResponse::Fields::kValue: {
243         int32_t value;
244         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
245         EXPECT_EQ(value, 124);
246         break;
247       }
248       case TestResponse::Fields::kRepeatedField:
249         break;  // Ignore this field
250     }
251   }
252 }
253 
TEST(RawCodegen,Server_HandleError)254 TEST(RawCodegen, Server_HandleError) {
255   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
256 
257   ASSERT_FALSE(ctx.service().last_responder().active());
258   ctx.call({});
259   ASSERT_TRUE(ctx.service().last_responder().active());
260 
261   ctx.SendClientError(Status::Unimplemented());
262 
263   EXPECT_FALSE(ctx.service().last_responder().active());
264 }
265 
TEST(RawCodegen,Server_Finish)266 TEST(RawCodegen, Server_Finish) {
267   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
268 
269   ASSERT_FALSE(ctx.service().last_responder().active());
270   ctx.call({});
271   ASSERT_TRUE(ctx.service().last_responder().active());
272 
273   EXPECT_EQ(OkStatus(), ctx.service().last_responder().Finish({}));
274   EXPECT_FALSE(ctx.service().last_responder().active());
275 }
276 
TEST(RawCodegen,Server_MoveCalls)277 TEST(RawCodegen, Server_MoveCalls) {
278   // Create two call objects on different channels so they are unique.
279   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
280 
281   RawUnaryResponder call;
282 
283   ctx.call({});
284 
285   EXPECT_TRUE(ctx.service().last_responder().active());
286   EXPECT_FALSE(call.active());
287 
288   call = std::move(ctx.service().last_responder());
289 
290   EXPECT_FALSE(ctx.service().last_responder().active());
291   EXPECT_TRUE(call.active());
292 }
293 
TEST(RawCodegen,Server_MoveBetweenActiveCallsWithBuffers)294 TEST(RawCodegen, Server_MoveBetweenActiveCallsWithBuffers) {
295   // Create two call objects on different channels so they are unique.
296   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_1;
297   ctx_1.set_channel_id(1);
298 
299   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_2;
300   ctx_2.set_channel_id(2);
301 
302   ctx_1.call({});
303   ctx_2.call({});
304 
305   ctx_1.service().last_responder() =
306       std::move(ctx_2.service().last_responder());
307 
308   ASSERT_TRUE(ctx_1.service().last_responder().active());
309   ASSERT_FALSE(ctx_2.service().last_responder().active());
310 
311   ctx_2.service().last_responder() =
312       std::move(ctx_1.service().last_responder());
313 
314   ASSERT_FALSE(ctx_1.service().last_responder().active());
315   ASSERT_TRUE(ctx_2.service().last_responder().active());
316 }
317 
TEST(RawCodegen,Server_InvokeServerStreamingRpc)318 TEST(RawCodegen, Server_InvokeServerStreamingRpc) {
319   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestServerStreamRpc) context;
320 
321   context.call(EncodeRequest(5, Status::Unauthenticated()));
322   EXPECT_TRUE(context.done());
323   EXPECT_EQ(Status::Unauthenticated(), context.status());
324   EXPECT_EQ(context.total_responses(), 5u);
325 
326   protobuf::Decoder decoder(context.responses().back());
327   while (decoder.Next().ok()) {
328     switch (static_cast<TestStreamResponse::Fields>(decoder.FieldNumber())) {
329       case TestStreamResponse::Fields::kNumber: {
330         int32_t value;
331         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
332         EXPECT_EQ(value, 4);
333         break;
334       }
335       case TestStreamResponse::Fields::kChunk:
336         FAIL();
337         break;
338     }
339   }
340 }
341 
ReadResponseNumber(ConstByteSpan data)342 int32_t ReadResponseNumber(ConstByteSpan data) {
343   int32_t value = -1;
344   protobuf::Decoder decoder(data);
345   while (decoder.Next().ok()) {
346     switch (static_cast<TestStreamResponse::Fields>(decoder.FieldNumber())) {
347       case TestStreamResponse::Fields::kNumber: {
348         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
349         break;
350       }
351       case TestStreamResponse::Fields::kChunk:
352       default:
353         ADD_FAILURE();
354         break;
355     }
356   }
357 
358   return value;
359 }
360 
TEST(RawCodegen,Server_InvokeClientStreamingRpc)361 TEST(RawCodegen, Server_InvokeClientStreamingRpc) {
362   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestClientStreamRpc) ctx;
363 
364   ctx.call();
365   ctx.SendClientStream(EncodeRequest(123, OkStatus()));
366 
367   ASSERT_TRUE(ctx.done());
368   EXPECT_EQ(Status::Unauthenticated(), ctx.status());
369   EXPECT_EQ(ctx.total_responses(), 1u);
370   EXPECT_EQ(ReadResponseNumber(ctx.responses().back()), 123);
371 }
372 
TEST(RawCodegen,Server_InvokeBidirectionalStreamingRpc)373 TEST(RawCodegen, Server_InvokeBidirectionalStreamingRpc) {
374   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestBidirectionalStreamRpc)
375   ctx;
376 
377   ctx.call();
378   ctx.SendClientStream(EncodeRequest(456, OkStatus()));
379 
380   ASSERT_TRUE(ctx.done());
381   EXPECT_EQ(Status::NotFound(), ctx.status());
382   ASSERT_EQ(ctx.total_responses(), 1u);
383   EXPECT_EQ(ReadResponseNumber(ctx.responses().back()), 456);
384 }
385 
TEST(RawCodegen,Client_ClientClass)386 TEST(RawCodegen, Client_ClientClass) {
387   RawClientTestContext context;
388 
389   test::pw_rpc::raw::TestService::Client service_client(context.client(),
390                                                         context.channel().id());
391 
392   EXPECT_EQ(service_client.channel_id(), context.channel().id());
393   EXPECT_EQ(&service_client.client(), &context.client());
394 }
395 
396 class RawCodegenClientTest : public ::testing::Test {
397  protected:
RawCodegenClientTest()398   RawCodegenClientTest()
399       : service_client_(context_.client(), context_.channel().id()) {}
400 
401   // Assumes the payload is a null-terminated string, not a protobuf.
OnNext()402   Function<void(ConstByteSpan)> OnNext() {
403     return [this](ConstByteSpan c_string) { CopyPayload(c_string); };
404   }
405 
OnCompleted()406   Function<void(Status)> OnCompleted() {
407     return [this](Status status) { status_ = status; };
408   }
409 
UnaryOnCompleted()410   Function<void(ConstByteSpan, Status)> UnaryOnCompleted() {
411     return [this](ConstByteSpan c_string, Status status) {
412       CopyPayload(c_string);
413       status_ = status;
414     };
415   }
416 
OnError()417   Function<void(Status)> OnError() {
418     return [this](Status error) { error_ = error; };
419   }
420 
421   RawClientTestContext<> context_;
422 
423   test::pw_rpc::raw::TestService::Client service_client_;
424 
425   // Store the payload as a null-terminated string for convenience. Use nullptr
426   // for an empty payload.
427   std::optional<const char*> payload_;
428   std::optional<Status> status_;
429   std::optional<Status> error_;
430 
431  private:
CopyPayload(ConstByteSpan c_string)432   void CopyPayload(ConstByteSpan c_string) {
433     ASSERT_LE(c_string.size(), sizeof(buffer_));
434 
435     if (c_string.empty()) {
436       payload_ = nullptr;
437     } else {
438       std::memcpy(buffer_, c_string.data(), c_string.size());
439       payload_ = buffer_;
440     }
441   }
442 
443   char buffer_[64];
444 };
445 
TEST_F(RawCodegenClientTest,InvokeUnaryRpc_Ok)446 TEST_F(RawCodegenClientTest, InvokeUnaryRpc_Ok) {
447   RawUnaryReceiver call = test::pw_rpc::raw::TestService::TestUnaryRpc(
448       context_.client(),
449       context_.channel().id(),
450       as_bytes(span("This is the request")),
451       UnaryOnCompleted(),
452       OnError());
453 
454   context_.server().SendResponse<test::pw_rpc::raw::TestService::TestUnaryRpc>(
455       as_bytes(span("(ㆆ_ㆆ)")), OkStatus());
456 
457   ASSERT_TRUE(payload_.has_value());
458   EXPECT_STREQ(payload_.value(), "(ㆆ_ㆆ)");
459   EXPECT_EQ(status_, OkStatus());
460   EXPECT_FALSE(error_.has_value());
461 }
462 
TEST_F(RawCodegenClientTest,InvokeUnaryRpc_Error)463 TEST_F(RawCodegenClientTest, InvokeUnaryRpc_Error) {
464   RawUnaryReceiver call = service_client_.TestUnaryRpc(
465       as_bytes(span("This is the request")), UnaryOnCompleted(), OnError());
466 
467   context_.server()
468       .SendServerError<test::pw_rpc::raw::TestService::TestUnaryRpc>(
469           Status::NotFound());
470 
471   EXPECT_FALSE(payload_.has_value());
472   EXPECT_FALSE(status_.has_value());
473   EXPECT_EQ(error_, Status::NotFound());
474 }
475 
TEST_F(RawCodegenClientTest,InvokeServerStreamRpc_Ok)476 TEST_F(RawCodegenClientTest, InvokeServerStreamRpc_Ok) {
477   RawClientReader call = test::pw_rpc::raw::TestService::TestServerStreamRpc(
478       context_.client(),
479       context_.channel().id(),
480       as_bytes(span("This is the request")),
481       OnNext(),
482       OnCompleted(),
483       OnError());
484 
485   context_.server()
486       .SendServerStream<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
487           as_bytes(span("(⌐□_□)")));
488 
489   ASSERT_TRUE(payload_.has_value());
490   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
491 
492   context_.server()
493       .SendServerStream<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
494           as_bytes(span("(o_O)")));
495 
496   EXPECT_STREQ(payload_.value(), "(o_O)");
497 
498   context_.server()
499       .SendResponse<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
500           Status::InvalidArgument());
501 
502   EXPECT_EQ(status_, Status::InvalidArgument());
503   EXPECT_FALSE(error_.has_value());
504 }
505 
TEST_F(RawCodegenClientTest,InvokeServerStreamRpc_Error)506 TEST_F(RawCodegenClientTest, InvokeServerStreamRpc_Error) {
507   RawClientReader call =
508       service_client_.TestServerStreamRpc(as_bytes(span("This is the request")),
509                                           OnNext(),
510                                           OnCompleted(),
511                                           OnError());
512 
513   context_.server()
514       .SendServerError<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
515           Status::FailedPrecondition());
516 
517   EXPECT_FALSE(payload_.has_value());
518   EXPECT_FALSE(status_.has_value());
519   EXPECT_EQ(error_, Status::FailedPrecondition());
520 }
521 
TEST_F(RawCodegenClientTest,InvokeClientStreamRpc_Ok)522 TEST_F(RawCodegenClientTest, InvokeClientStreamRpc_Ok) {
523   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
524       context_.client(),
525       context_.channel().id(),
526       UnaryOnCompleted(),
527       OnError());
528 
529   EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("(•‿•)"))));
530   EXPECT_STREQ(
531       reinterpret_cast<const char*>(
532           context_.output()
533               .payloads<test::pw_rpc::raw::TestService::TestClientStreamRpc>()
534               .back()
535               .data()),
536       "(•‿•)");
537 
538   context_.server()
539       .SendResponse<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
540           as_bytes(span("(⌐□_□)")), Status::InvalidArgument());
541 
542   ASSERT_TRUE(payload_.has_value());
543   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
544   EXPECT_EQ(status_, Status::InvalidArgument());
545   EXPECT_FALSE(error_.has_value());
546 }
547 
TEST_F(RawCodegenClientTest,ClientStream_Finish)548 TEST_F(RawCodegenClientTest, ClientStream_Finish) {
549   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
550       context_.client(),
551       context_.channel().id(),
552       UnaryOnCompleted(),
553       OnError());
554 
555   context_.server()
556       .SendResponse<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
557           {}, OkStatus());
558 
559   ASSERT_TRUE(payload_.has_value());
560   EXPECT_EQ(payload_.value(), nullptr);
561   EXPECT_EQ(status_, OkStatus());
562 }
563 
TEST_F(RawCodegenClientTest,ClientStream_Cancel)564 TEST_F(RawCodegenClientTest, ClientStream_Cancel) {
565   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
566       context_.client(),
567       context_.channel().id(),
568       UnaryOnCompleted(),
569       OnError());
570 
571   EXPECT_EQ(call.Cancel(), OkStatus());
572   EXPECT_FALSE(call.active());
573 }
574 
TEST_F(RawCodegenClientTest,ClientStream_Move)575 TEST_F(RawCodegenClientTest, ClientStream_Move) {
576   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
577       context_.client(),
578       context_.channel().id(),
579       UnaryOnCompleted(),
580       OnError());
581 
582   EXPECT_EQ(OkStatus(), call.RequestCompletion());
583 
584   RawClientWriter call_2;
585 
586   call = std::move(call_2);
587   EXPECT_FALSE(call.active());
588 }
589 
TEST_F(RawCodegenClientTest,InvokeClientStreamRpc_Error)590 TEST_F(RawCodegenClientTest, InvokeClientStreamRpc_Error) {
591   RawClientWriter call =
592       service_client_.TestClientStreamRpc(UnaryOnCompleted(), OnError());
593 
594   context_.server()
595       .SendServerError<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
596           Status::FailedPrecondition());
597 
598   EXPECT_FALSE(payload_.has_value());
599   EXPECT_FALSE(status_.has_value());
600   EXPECT_EQ(error_, Status::FailedPrecondition());
601 }
602 
TEST_F(RawCodegenClientTest,InvokeBidirectionalStreamRpc_Ok)603 TEST_F(RawCodegenClientTest, InvokeBidirectionalStreamRpc_Ok) {
604   RawClientReaderWriter call =
605       test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc(
606           context_.client(),
607           context_.channel().id(),
608           OnNext(),
609           OnCompleted(),
610           OnError());
611 
612   EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("(•‿•)"))));
613   EXPECT_STREQ(
614       reinterpret_cast<const char*>(
615           context_.output()
616               .payloads<
617                   test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>()
618               .back()
619               .data()),
620       "(•‿•)");
621 
622   context_.server()
623       .SendServerStream<
624           test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
625           as_bytes(span("(⌐□_□)")));
626 
627   ASSERT_TRUE(payload_.has_value());
628   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
629 
630   context_.server()
631       .SendResponse<test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
632           Status::InvalidArgument());
633 
634   EXPECT_EQ(status_, Status::InvalidArgument());
635   EXPECT_FALSE(error_.has_value());
636 }
637 
TEST_F(RawCodegenClientTest,InvokeBidirectionalStreamRpc_Error)638 TEST_F(RawCodegenClientTest, InvokeBidirectionalStreamRpc_Error) {
639   RawClientReaderWriter call = service_client_.TestBidirectionalStreamRpc(
640       OnNext(), OnCompleted(), OnError());
641 
642   context_.server()
643       .SendServerError<
644           test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
645           Status::Internal());
646 
647   EXPECT_FALSE(payload_.has_value());
648   EXPECT_FALSE(status_.has_value());
649   EXPECT_EQ(error_, Status::Internal());
650 }
651 
652 }  // namespace
653 }  // namespace pw::rpc
654