1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/synchronous_call.h"
16
17 #include <chrono>
18
19 #include "pw_chrono/system_clock.h"
20 #include "pw_rpc/channel.h"
21 #include "pw_rpc/internal/packet.h"
22 #include "pw_rpc/pwpb/fake_channel_output.h"
23 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
24 #include "pw_rpc_transport/test_loopback_service_registry.h"
25 #include "pw_status/status.h"
26 #include "pw_status/status_with_size.h"
27 #include "pw_thread/thread.h"
28 #include "pw_unit_test/framework.h"
29 #include "pw_work_queue/test_thread.h"
30 #include "pw_work_queue/work_queue.h"
31
32 namespace pw::rpc::test {
33 namespace {
34
35 using pw::rpc::test::pw_rpc::pwpb::TestService;
36 using MethodInfo = internal::MethodInfo<TestService::TestUnaryRpc>;
37
38 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
39 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
40 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
41
42 class TestServiceImpl final
43 : public pw::rpc::test::pw_rpc::pwpb::TestService::Service<
44 TestServiceImpl> {
45 public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message & response)46 Status TestUnaryRpc(const TestRequest::Message& /*request*/,
47 TestResponse::Message& response) {
48 response.value = 42;
49 response.repeated_field.SetEncoder(
50 [](TestResponse::StreamEncoder& encoder) {
51 constexpr std::array<uint32_t, 3> kValues = {7, 8, 9};
52 return encoder.WriteRepeatedField(kValues);
53 });
54 return OkStatus();
55 }
TestAnotherUnaryRpc(const TestRequest::Message &,TestResponse::Message &)56 Status TestAnotherUnaryRpc(const TestRequest::Message& /*request*/,
57 TestResponse::Message& /*response*/) {
58 return OkStatus();
59 }
TestServerStreamRpc(const TestRequest::Message &,ServerWriter<TestStreamResponse::Message> &)60 void TestServerStreamRpc(const TestRequest::Message&,
61 ServerWriter<TestStreamResponse::Message>&) {}
TestClientStreamRpc(RawServerReader &)62 void TestClientStreamRpc(RawServerReader&) {}
TestBidirectionalStreamRpc(ServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)63 void TestBidirectionalStreamRpc(
64 ServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>&) {}
65 };
66
67 class SynchronousCallTest : public ::testing::Test {
68 public:
SynchronousCallTest()69 SynchronousCallTest()
70 : channels_({{Channel::Create<42>(&fake_output_)}}), client_(channels_) {}
71
SetUp()72 void SetUp() override {
73 work_thread_ =
74 Thread(work_queue::test::WorkQueueThreadOptions(), work_queue_);
75 }
76
TearDown()77 void TearDown() override {
78 work_queue_.RequestStop();
79 #if PW_THREAD_JOINING_ENABLED
80 work_thread_.join();
81 #else
82 work_thread_.detach();
83 #endif // PW_THREAD_JOINING_ENABLED
84 }
85
86 protected:
87 using FakeChannelOutput = PwpbFakeChannelOutput<2>;
88
OnSend(span<const std::byte> buffer,Status status)89 void OnSend(span<const std::byte> buffer, Status status) {
90 if (!status.ok()) {
91 return;
92 }
93 auto result = internal::Packet::FromBuffer(buffer);
94 EXPECT_TRUE(result.ok());
95 request_packet_ = *result;
96
97 EXPECT_TRUE(work_queue_.PushWork([this]() { SendResponse(); }).ok());
98 }
99
SendResponse()100 void SendResponse() {
101 std::array<std::byte, 256> buffer;
102 std::array<std::byte, 32> payload_buffer;
103
104 StatusWithSize size_status =
105 MethodInfo::serde().response().Encode(response_, payload_buffer);
106 EXPECT_TRUE(size_status.ok());
107
108 auto response =
109 internal::Packet::Response(request_packet_, response_status_);
110 response.set_payload({payload_buffer.data(), size_status.size()});
111 EXPECT_TRUE(client_.ProcessPacket(response.Encode(buffer).value()).ok());
112 }
113
set_response(const TestResponse::Message & response,Status response_status=OkStatus ())114 void set_response(const TestResponse::Message& response,
115 Status response_status = OkStatus()) {
116 response_ = response;
117 response_status_ = response_status;
118 output().set_on_send([this](span<const std::byte> buffer, Status status) {
119 OnSend(buffer, status);
120 });
121 }
122
generated_client()123 MethodInfo::GeneratedClient generated_client() {
124 return MethodInfo::GeneratedClient(client(), channel().id());
125 }
126
output()127 FakeChannelOutput& output() { return fake_output_; }
channel() const128 const Channel& channel() const { return channels_.front(); }
client()129 Client& client() { return client_; }
130
131 private:
132 FakeChannelOutput fake_output_;
133 std::array<Channel, 1> channels_;
134 Client client_;
135 Thread work_thread_;
136 work_queue::WorkQueueWithBuffer<1> work_queue_;
137 TestResponse::Message response_{};
138 Status response_status_ = OkStatus();
139 internal::Packet request_packet_;
140 };
141
TEST_F(SynchronousCallTest,SynchronousCallSuccess)142 TEST_F(SynchronousCallTest, SynchronousCallSuccess) {
143 TestRequest::Message request{.integer = 5, .status_code = 0};
144 TestResponse::Message response{.value = 42, .repeated_field{}};
145
146 set_response(response, OkStatus());
147
148 auto result = SynchronousCall<TestService::TestUnaryRpc>(
149 client(), channel().id(), request);
150 EXPECT_TRUE(result.ok());
151 EXPECT_EQ(result.response().value, 42);
152 }
153
TEST_F(SynchronousCallTest,SynchronousCallServerError)154 TEST_F(SynchronousCallTest, SynchronousCallServerError) {
155 TestRequest::Message request{.integer = 5, .status_code = 0};
156 TestResponse::Message response{.value = 42, .repeated_field{}};
157
158 set_response(response, Status::Internal());
159
160 auto result = SynchronousCall<TestService::TestUnaryRpc>(
161 client(), channel().id(), request);
162 EXPECT_TRUE(result.is_error());
163 EXPECT_EQ(result.status(), Status::Internal());
164
165 // We should still receive the response
166 EXPECT_TRUE(result.is_server_response());
167 EXPECT_EQ(result.response().value, 42);
168 }
169
TEST_F(SynchronousCallTest,SynchronousCallRpcError)170 TEST_F(SynchronousCallTest, SynchronousCallRpcError) {
171 TestRequest::Message request{.integer = 5, .status_code = 0};
172
173 // Internally, if Channel receives a non-ok status from the
174 // ChannelOutput::Send, it will always return Unknown.
175 output().set_send_status(Status::Unknown());
176
177 auto result = SynchronousCall<TestService::TestUnaryRpc>(
178 client(), channel().id(), request);
179 EXPECT_TRUE(result.is_rpc_error());
180 EXPECT_EQ(result.status(), Status::Unknown());
181 }
182
TEST_F(SynchronousCallTest,SynchronousCallForTimeoutError)183 TEST_F(SynchronousCallTest, SynchronousCallForTimeoutError) {
184 TestRequest::Message request{.integer = 5, .status_code = 0};
185
186 auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
187 client(),
188 channel().id(),
189 request,
190 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
191
192 EXPECT_TRUE(result.is_timeout());
193 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
194 }
195
TEST_F(SynchronousCallTest,SynchronousCallUntilTimeoutError)196 TEST_F(SynchronousCallTest, SynchronousCallUntilTimeoutError) {
197 TestRequest::Message request{.integer = 5, .status_code = 0};
198
199 auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
200 client(), channel().id(), request, chrono::SystemClock::now());
201
202 EXPECT_TRUE(result.is_timeout());
203 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
204 }
205
TEST_F(SynchronousCallTest,SynchronousCallCustomResponse)206 TEST_F(SynchronousCallTest, SynchronousCallCustomResponse) {
207 TestServiceImpl test_service;
208 TestLoopbackServiceRegistry service_registry;
209 service_registry.RegisterService(test_service);
210
211 class CustomResponse : public TestResponse::Message {
212 public:
213 CustomResponse() {
214 repeated_field.SetDecoder([this](TestResponse::StreamDecoder& decoder) {
215 return decoder.ReadRepeatedField(values);
216 });
217 }
218 pw::Vector<uint32_t, 4> values{};
219 };
220
221 auto result = SynchronousCall<TestService::TestUnaryRpc, CustomResponse>(
222 service_registry.client_server().client(),
223 service_registry.channel_id(),
224 {.integer = 5, .status_code = 0});
225 EXPECT_EQ(result.status(), OkStatus());
226
227 EXPECT_EQ(3u, result.response().values.size());
228 EXPECT_EQ(7u, result.response().values[0]);
229 EXPECT_EQ(8u, result.response().values[1]);
230 EXPECT_EQ(9u, result.response().values[2]);
231 }
232
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallSuccess)233 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallSuccess) {
234 TestRequest::Message request{.integer = 5, .status_code = 0};
235 TestResponse::Message response{.value = 42, .repeated_field{}};
236
237 set_response(response, OkStatus());
238
239 auto result =
240 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
241 EXPECT_TRUE(result.ok());
242 EXPECT_EQ(result.response().value, 42);
243 }
244
245 #if PW_RPC_DYNAMIC_ALLOCATION
246
TEST_F(SynchronousCallTest,GeneratedDynamicClientSynchronousCallSuccess)247 TEST_F(SynchronousCallTest, GeneratedDynamicClientSynchronousCallSuccess) {
248 TestRequest::Message request{.integer = 5, .status_code = 0};
249 TestResponse::Message response{.value = 42, .repeated_field{}};
250
251 set_response(response, OkStatus());
252
253 TestService::DynamicClient dynamic_client(client(), channel().id());
254 auto result =
255 SynchronousCall<TestService::TestUnaryRpc>(dynamic_client, request);
256 EXPECT_TRUE(result.ok());
257 EXPECT_EQ(result.response().value, 42);
258 }
259
260 #endif // PW_RPC_DYNAMIC_ALLOCATION
261
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallServerError)262 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallServerError) {
263 TestRequest::Message request{.integer = 5, .status_code = 0};
264 TestResponse::Message response{.value = 42, .repeated_field{}};
265
266 set_response(response, Status::Internal());
267
268 auto result =
269 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
270 EXPECT_TRUE(result.is_error());
271 EXPECT_EQ(result.status(), Status::Internal());
272
273 // We should still receive the response
274 EXPECT_TRUE(result.is_server_response());
275 EXPECT_EQ(result.response().value, 42);
276 }
277
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallRpcError)278 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallRpcError) {
279 TestRequest::Message request{.integer = 5, .status_code = 0};
280
281 // Internally, if Channel receives a non-ok status from the
282 // ChannelOutput::Send, it will always return Unknown.
283 output().set_send_status(Status::Unknown());
284
285 auto result =
286 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
287 EXPECT_TRUE(result.is_rpc_error());
288 EXPECT_EQ(result.status(), Status::Unknown());
289 }
290
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallForTimeoutError)291 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallForTimeoutError) {
292 TestRequest::Message request{.integer = 5, .status_code = 0};
293
294 auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
295 generated_client(),
296 request,
297 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
298
299 EXPECT_TRUE(result.is_timeout());
300 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
301 }
302
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallUntilTimeoutError)303 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallUntilTimeoutError) {
304 TestRequest::Message request{.integer = 5, .status_code = 0};
305
306 auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
307 generated_client(), request, chrono::SystemClock::now());
308
309 EXPECT_TRUE(result.is_timeout());
310 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
311 }
312 } // namespace
313 } // namespace pw::rpc::test
314