xref: /aosp_15_r20/external/pigweed/pw_rpc/pwpb/synchronous_call_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/synchronous_call.h"
16 
17 #include <chrono>
18 
19 #include "pw_chrono/system_clock.h"
20 #include "pw_rpc/channel.h"
21 #include "pw_rpc/internal/packet.h"
22 #include "pw_rpc/pwpb/fake_channel_output.h"
23 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
24 #include "pw_rpc_transport/test_loopback_service_registry.h"
25 #include "pw_status/status.h"
26 #include "pw_status/status_with_size.h"
27 #include "pw_thread/thread.h"
28 #include "pw_unit_test/framework.h"
29 #include "pw_work_queue/test_thread.h"
30 #include "pw_work_queue/work_queue.h"
31 
32 namespace pw::rpc::test {
33 namespace {
34 
35 using pw::rpc::test::pw_rpc::pwpb::TestService;
36 using MethodInfo = internal::MethodInfo<TestService::TestUnaryRpc>;
37 
38 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
39 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
40 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
41 
42 class TestServiceImpl final
43     : public pw::rpc::test::pw_rpc::pwpb::TestService::Service<
44           TestServiceImpl> {
45  public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message & response)46   Status TestUnaryRpc(const TestRequest::Message& /*request*/,
47                       TestResponse::Message& response) {
48     response.value = 42;
49     response.repeated_field.SetEncoder(
50         [](TestResponse::StreamEncoder& encoder) {
51           constexpr std::array<uint32_t, 3> kValues = {7, 8, 9};
52           return encoder.WriteRepeatedField(kValues);
53         });
54     return OkStatus();
55   }
TestAnotherUnaryRpc(const TestRequest::Message &,TestResponse::Message &)56   Status TestAnotherUnaryRpc(const TestRequest::Message& /*request*/,
57                              TestResponse::Message& /*response*/) {
58     return OkStatus();
59   }
TestServerStreamRpc(const TestRequest::Message &,ServerWriter<TestStreamResponse::Message> &)60   void TestServerStreamRpc(const TestRequest::Message&,
61                            ServerWriter<TestStreamResponse::Message>&) {}
TestClientStreamRpc(RawServerReader &)62   void TestClientStreamRpc(RawServerReader&) {}
TestBidirectionalStreamRpc(ServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)63   void TestBidirectionalStreamRpc(
64       ServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>&) {}
65 };
66 
67 class SynchronousCallTest : public ::testing::Test {
68  public:
SynchronousCallTest()69   SynchronousCallTest()
70       : channels_({{Channel::Create<42>(&fake_output_)}}), client_(channels_) {}
71 
SetUp()72   void SetUp() override {
73     work_thread_ =
74         Thread(work_queue::test::WorkQueueThreadOptions(), work_queue_);
75   }
76 
TearDown()77   void TearDown() override {
78     work_queue_.RequestStop();
79 #if PW_THREAD_JOINING_ENABLED
80     work_thread_.join();
81 #else
82     work_thread_.detach();
83 #endif  // PW_THREAD_JOINING_ENABLED
84   }
85 
86  protected:
87   using FakeChannelOutput = PwpbFakeChannelOutput<2>;
88 
OnSend(span<const std::byte> buffer,Status status)89   void OnSend(span<const std::byte> buffer, Status status) {
90     if (!status.ok()) {
91       return;
92     }
93     auto result = internal::Packet::FromBuffer(buffer);
94     EXPECT_TRUE(result.ok());
95     request_packet_ = *result;
96 
97     EXPECT_TRUE(work_queue_.PushWork([this]() { SendResponse(); }).ok());
98   }
99 
SendResponse()100   void SendResponse() {
101     std::array<std::byte, 256> buffer;
102     std::array<std::byte, 32> payload_buffer;
103 
104     StatusWithSize size_status =
105         MethodInfo::serde().response().Encode(response_, payload_buffer);
106     EXPECT_TRUE(size_status.ok());
107 
108     auto response =
109         internal::Packet::Response(request_packet_, response_status_);
110     response.set_payload({payload_buffer.data(), size_status.size()});
111     EXPECT_TRUE(client_.ProcessPacket(response.Encode(buffer).value()).ok());
112   }
113 
set_response(const TestResponse::Message & response,Status response_status=OkStatus ())114   void set_response(const TestResponse::Message& response,
115                     Status response_status = OkStatus()) {
116     response_ = response;
117     response_status_ = response_status;
118     output().set_on_send([this](span<const std::byte> buffer, Status status) {
119       OnSend(buffer, status);
120     });
121   }
122 
generated_client()123   MethodInfo::GeneratedClient generated_client() {
124     return MethodInfo::GeneratedClient(client(), channel().id());
125   }
126 
output()127   FakeChannelOutput& output() { return fake_output_; }
channel() const128   const Channel& channel() const { return channels_.front(); }
client()129   Client& client() { return client_; }
130 
131  private:
132   FakeChannelOutput fake_output_;
133   std::array<Channel, 1> channels_;
134   Client client_;
135   Thread work_thread_;
136   work_queue::WorkQueueWithBuffer<1> work_queue_;
137   TestResponse::Message response_{};
138   Status response_status_ = OkStatus();
139   internal::Packet request_packet_;
140 };
141 
TEST_F(SynchronousCallTest,SynchronousCallSuccess)142 TEST_F(SynchronousCallTest, SynchronousCallSuccess) {
143   TestRequest::Message request{.integer = 5, .status_code = 0};
144   TestResponse::Message response{.value = 42, .repeated_field{}};
145 
146   set_response(response, OkStatus());
147 
148   auto result = SynchronousCall<TestService::TestUnaryRpc>(
149       client(), channel().id(), request);
150   EXPECT_TRUE(result.ok());
151   EXPECT_EQ(result.response().value, 42);
152 }
153 
TEST_F(SynchronousCallTest,SynchronousCallServerError)154 TEST_F(SynchronousCallTest, SynchronousCallServerError) {
155   TestRequest::Message request{.integer = 5, .status_code = 0};
156   TestResponse::Message response{.value = 42, .repeated_field{}};
157 
158   set_response(response, Status::Internal());
159 
160   auto result = SynchronousCall<TestService::TestUnaryRpc>(
161       client(), channel().id(), request);
162   EXPECT_TRUE(result.is_error());
163   EXPECT_EQ(result.status(), Status::Internal());
164 
165   // We should still receive the response
166   EXPECT_TRUE(result.is_server_response());
167   EXPECT_EQ(result.response().value, 42);
168 }
169 
TEST_F(SynchronousCallTest,SynchronousCallRpcError)170 TEST_F(SynchronousCallTest, SynchronousCallRpcError) {
171   TestRequest::Message request{.integer = 5, .status_code = 0};
172 
173   // Internally, if Channel receives a non-ok status from the
174   // ChannelOutput::Send, it will always return Unknown.
175   output().set_send_status(Status::Unknown());
176 
177   auto result = SynchronousCall<TestService::TestUnaryRpc>(
178       client(), channel().id(), request);
179   EXPECT_TRUE(result.is_rpc_error());
180   EXPECT_EQ(result.status(), Status::Unknown());
181 }
182 
TEST_F(SynchronousCallTest,SynchronousCallForTimeoutError)183 TEST_F(SynchronousCallTest, SynchronousCallForTimeoutError) {
184   TestRequest::Message request{.integer = 5, .status_code = 0};
185 
186   auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
187       client(),
188       channel().id(),
189       request,
190       chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
191 
192   EXPECT_TRUE(result.is_timeout());
193   EXPECT_EQ(result.status(), Status::DeadlineExceeded());
194 }
195 
TEST_F(SynchronousCallTest,SynchronousCallUntilTimeoutError)196 TEST_F(SynchronousCallTest, SynchronousCallUntilTimeoutError) {
197   TestRequest::Message request{.integer = 5, .status_code = 0};
198 
199   auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
200       client(), channel().id(), request, chrono::SystemClock::now());
201 
202   EXPECT_TRUE(result.is_timeout());
203   EXPECT_EQ(result.status(), Status::DeadlineExceeded());
204 }
205 
TEST_F(SynchronousCallTest,SynchronousCallCustomResponse)206 TEST_F(SynchronousCallTest, SynchronousCallCustomResponse) {
207   TestServiceImpl test_service;
208   TestLoopbackServiceRegistry service_registry;
209   service_registry.RegisterService(test_service);
210 
211   class CustomResponse : public TestResponse::Message {
212    public:
213     CustomResponse() {
214       repeated_field.SetDecoder([this](TestResponse::StreamDecoder& decoder) {
215         return decoder.ReadRepeatedField(values);
216       });
217     }
218     pw::Vector<uint32_t, 4> values{};
219   };
220 
221   auto result = SynchronousCall<TestService::TestUnaryRpc, CustomResponse>(
222       service_registry.client_server().client(),
223       service_registry.channel_id(),
224       {.integer = 5, .status_code = 0});
225   EXPECT_EQ(result.status(), OkStatus());
226 
227   EXPECT_EQ(3u, result.response().values.size());
228   EXPECT_EQ(7u, result.response().values[0]);
229   EXPECT_EQ(8u, result.response().values[1]);
230   EXPECT_EQ(9u, result.response().values[2]);
231 }
232 
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallSuccess)233 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallSuccess) {
234   TestRequest::Message request{.integer = 5, .status_code = 0};
235   TestResponse::Message response{.value = 42, .repeated_field{}};
236 
237   set_response(response, OkStatus());
238 
239   auto result =
240       SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
241   EXPECT_TRUE(result.ok());
242   EXPECT_EQ(result.response().value, 42);
243 }
244 
245 #if PW_RPC_DYNAMIC_ALLOCATION
246 
TEST_F(SynchronousCallTest,GeneratedDynamicClientSynchronousCallSuccess)247 TEST_F(SynchronousCallTest, GeneratedDynamicClientSynchronousCallSuccess) {
248   TestRequest::Message request{.integer = 5, .status_code = 0};
249   TestResponse::Message response{.value = 42, .repeated_field{}};
250 
251   set_response(response, OkStatus());
252 
253   TestService::DynamicClient dynamic_client(client(), channel().id());
254   auto result =
255       SynchronousCall<TestService::TestUnaryRpc>(dynamic_client, request);
256   EXPECT_TRUE(result.ok());
257   EXPECT_EQ(result.response().value, 42);
258 }
259 
260 #endif  // PW_RPC_DYNAMIC_ALLOCATION
261 
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallServerError)262 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallServerError) {
263   TestRequest::Message request{.integer = 5, .status_code = 0};
264   TestResponse::Message response{.value = 42, .repeated_field{}};
265 
266   set_response(response, Status::Internal());
267 
268   auto result =
269       SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
270   EXPECT_TRUE(result.is_error());
271   EXPECT_EQ(result.status(), Status::Internal());
272 
273   // We should still receive the response
274   EXPECT_TRUE(result.is_server_response());
275   EXPECT_EQ(result.response().value, 42);
276 }
277 
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallRpcError)278 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallRpcError) {
279   TestRequest::Message request{.integer = 5, .status_code = 0};
280 
281   // Internally, if Channel receives a non-ok status from the
282   // ChannelOutput::Send, it will always return Unknown.
283   output().set_send_status(Status::Unknown());
284 
285   auto result =
286       SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
287   EXPECT_TRUE(result.is_rpc_error());
288   EXPECT_EQ(result.status(), Status::Unknown());
289 }
290 
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallForTimeoutError)291 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallForTimeoutError) {
292   TestRequest::Message request{.integer = 5, .status_code = 0};
293 
294   auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
295       generated_client(),
296       request,
297       chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
298 
299   EXPECT_TRUE(result.is_timeout());
300   EXPECT_EQ(result.status(), Status::DeadlineExceeded());
301 }
302 
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallUntilTimeoutError)303 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallUntilTimeoutError) {
304   TestRequest::Message request{.integer = 5, .status_code = 0};
305 
306   auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
307       generated_client(), request, chrono::SystemClock::now());
308 
309   EXPECT_TRUE(result.is_timeout());
310   EXPECT_EQ(result.status(), Status::DeadlineExceeded());
311 }
312 }  // namespace
313 }  // namespace pw::rpc::test
314