1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/synchronous_call.h"
16
17 #include <chrono>
18 #include <string_view>
19
20 #include "pw_assert/check.h"
21 #include "pw_chrono/system_clock.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/internal/packet.h"
24 #include "pw_rpc/raw/fake_channel_output.h"
25 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
26 #include "pw_status/status.h"
27 #include "pw_status/status_with_size.h"
28 #include "pw_thread/thread.h"
29 #include "pw_unit_test/framework.h"
30 #include "pw_work_queue/test_thread.h"
31 #include "pw_work_queue/work_queue.h"
32
33 namespace pw::rpc::test {
34 namespace {
35
36 using ::pw::rpc::test::pw_rpc::raw::TestService;
37 using MethodInfo = internal::MethodInfo<TestService::TestUnaryRpc>;
38
39 class RawSynchronousCallTest : public ::testing::Test {
40 public:
RawSynchronousCallTest()41 RawSynchronousCallTest()
42 : channels_({{Channel::Create<42>(&fake_output_)}}), client_(channels_) {}
43
SetUp()44 void SetUp() override {
45 work_thread_ =
46 Thread(work_queue::test::WorkQueueThreadOptions(), work_queue_);
47 }
48
TearDown()49 void TearDown() override {
50 work_queue_.RequestStop();
51 #if PW_THREAD_JOINING_ENABLED
52 work_thread_.join();
53 #else
54 work_thread_.detach();
55 #endif // PW_THREAD_JOINING_ENABLED
56 }
57
58 protected:
OnSend(span<const std::byte> buffer,Status status)59 void OnSend(span<const std::byte> buffer, Status status) {
60 if (!status.ok()) {
61 return;
62 }
63 auto result = internal::Packet::FromBuffer(buffer);
64 EXPECT_TRUE(result.ok());
65 request_packet_ = *result;
66
67 EXPECT_TRUE(work_queue_.PushWork([this]() { SendResponse(); }).ok());
68 }
69
SendResponse()70 void SendResponse() {
71 std::array<std::byte, 256> buffer;
72 std::array<char, 32> payload_buffer;
73
74 PW_CHECK_UINT_LE(response_.size(), payload_buffer.size());
75 size_t size = response_.copy(payload_buffer.data(), payload_buffer.size());
76
77 auto response =
78 internal::Packet::Response(request_packet_, response_status_);
79 response.set_payload(as_bytes(span(payload_buffer.data(), size)));
80 EXPECT_TRUE(client_.ProcessPacket(response.Encode(buffer).value()).ok());
81 }
82
set_response(std::string_view response,Status response_status=OkStatus ())83 void set_response(std::string_view response,
84 Status response_status = OkStatus()) {
85 response_ = response;
86 response_status_ = response_status;
87 output().set_on_send([this](span<const std::byte> buffer, Status status) {
88 OnSend(buffer, status);
89 });
90 }
91
generated_client()92 MethodInfo::GeneratedClient generated_client() {
93 return MethodInfo::GeneratedClient(client(), channel().id());
94 }
95
output()96 RawFakeChannelOutput<2>& output() { return fake_output_; }
channel() const97 const Channel& channel() const { return channels_.front(); }
client()98 Client& client() { return client_; }
99
100 private:
101 RawFakeChannelOutput<2> fake_output_;
102 std::array<Channel, 1> channels_;
103 Client client_;
104 Thread work_thread_;
105 work_queue::WorkQueueWithBuffer<1> work_queue_;
106 std::string_view response_;
107 Status response_status_ = OkStatus();
108 internal::Packet request_packet_;
109 };
110
111 template <Status::Code kExpectedStatus = OkStatus().code()>
CopyReply(InlineString<32> & reply)112 auto CopyReply(InlineString<32>& reply) {
113 return [&reply](ConstByteSpan response, Status status) {
114 EXPECT_EQ(Status(kExpectedStatus), status);
115 reply.assign(reinterpret_cast<const char*>(response.data()),
116 response.size());
117 };
118 }
119
ExpectNoReply(ConstByteSpan,Status)120 void ExpectNoReply(ConstByteSpan, Status) { FAIL(); }
121
TEST_F(RawSynchronousCallTest,SynchronousCallSuccess)122 TEST_F(RawSynchronousCallTest, SynchronousCallSuccess) {
123 set_response("jicama", OkStatus());
124
125 InlineString<32> reply;
126 ASSERT_EQ(OkStatus(),
127 SynchronousCall<TestService::TestUnaryRpc>(
128 client(), channel().id(), {}, CopyReply(reply)));
129 EXPECT_EQ("jicama", reply);
130 }
131
TEST_F(RawSynchronousCallTest,SynchronousCallServerError)132 TEST_F(RawSynchronousCallTest, SynchronousCallServerError) {
133 set_response("raddish", Status::Internal());
134
135 InlineString<32> reply;
136 ASSERT_EQ(OkStatus(),
137 SynchronousCall<TestService::TestUnaryRpc>(
138 client(),
139 channel().id(),
140 {},
141 CopyReply<Status::Internal().code()>(reply)));
142 // We should still receive the response
143 EXPECT_EQ("raddish", reply);
144 }
145
TEST_F(RawSynchronousCallTest,SynchronousCallRpcError)146 TEST_F(RawSynchronousCallTest, SynchronousCallRpcError) {
147 // Internally, if Channel receives a non-ok status from the
148 // ChannelOutput::Send, it will always return Unknown.
149 output().set_send_status(Status::Unknown());
150
151 EXPECT_EQ(Status::Unknown(),
152 SynchronousCall<TestService::TestUnaryRpc>(
153 client(), channel().id(), {}, ExpectNoReply));
154 }
155
TEST_F(RawSynchronousCallTest,SynchronousCallFor)156 TEST_F(RawSynchronousCallTest, SynchronousCallFor) {
157 set_response("broccoli", Status::NotFound());
158
159 InlineString<32> reply;
160 ASSERT_EQ(OkStatus(),
161 SynchronousCallFor<TestService::TestUnaryRpc>(
162 client(),
163 channel().id(),
164 {},
165 chrono::SystemClock::for_at_least(std::chrono::seconds(1)),
166 [&reply](ConstByteSpan response, Status status) {
167 EXPECT_EQ(Status::NotFound(), status);
168 reply.assign(reinterpret_cast<const char*>(response.data()),
169 response.size());
170 }));
171 EXPECT_EQ("broccoli", reply);
172 }
173
TEST_F(RawSynchronousCallTest,SynchronousCallForTimeoutError)174 TEST_F(RawSynchronousCallTest, SynchronousCallForTimeoutError) {
175 ASSERT_EQ(Status::DeadlineExceeded(),
176 SynchronousCallFor<TestService::TestUnaryRpc>(
177 client(),
178 channel().id(),
179 {},
180 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)),
181 ExpectNoReply));
182 }
183
TEST_F(RawSynchronousCallTest,SynchronousCallUntilTimeoutError)184 TEST_F(RawSynchronousCallTest, SynchronousCallUntilTimeoutError) {
185 EXPECT_EQ(Status::DeadlineExceeded(),
186 SynchronousCallUntil<TestService::TestUnaryRpc>(
187 client(),
188 channel().id(),
189 {},
190 chrono::SystemClock::now(),
191 ExpectNoReply));
192 }
193
TEST_F(RawSynchronousCallTest,GeneratedClientSynchronousCallSuccess)194 TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallSuccess) {
195 set_response("lettuce", OkStatus());
196
197 InlineString<32> reply;
198 EXPECT_EQ(OkStatus(),
199 SynchronousCall<TestService::TestUnaryRpc>(
200 generated_client(), {}, CopyReply(reply)));
201 EXPECT_EQ("lettuce", reply);
202 }
203
TEST_F(RawSynchronousCallTest,GeneratedClientSynchronousCallServerError)204 TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallServerError) {
205 set_response("cabbage", Status::Internal());
206
207 InlineString<32> reply;
208 EXPECT_EQ(
209 OkStatus(),
210 SynchronousCall<TestService::TestUnaryRpc>(
211 generated_client(), {}, CopyReply<Status::Internal().code()>(reply)));
212 EXPECT_EQ("cabbage", reply);
213 }
214
TEST_F(RawSynchronousCallTest,GeneratedClientSynchronousCallRpcError)215 TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallRpcError) {
216 output().set_send_status(Status::Unknown());
217
218 EXPECT_EQ(Status::Unknown(),
219 SynchronousCall<TestService::TestUnaryRpc>(
220 generated_client(), {}, ExpectNoReply));
221 }
222
TEST_F(RawSynchronousCallTest,GeneratedClientSynchronousCallForTimeoutError)223 TEST_F(RawSynchronousCallTest, GeneratedClientSynchronousCallForTimeoutError) {
224 EXPECT_EQ(Status::DeadlineExceeded(),
225 SynchronousCallFor<TestService::TestUnaryRpc>(
226 generated_client(),
227 {},
228 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)),
229 ExpectNoReply));
230 }
231
TEST_F(RawSynchronousCallTest,GeneratedClientSynchronousCallUntilTimeoutError)232 TEST_F(RawSynchronousCallTest,
233 GeneratedClientSynchronousCallUntilTimeoutError) {
234 EXPECT_EQ(
235 Status::DeadlineExceeded(),
236 SynchronousCallUntil<TestService::TestUnaryRpc>(
237 generated_client(), {}, chrono::SystemClock::now(), ExpectNoReply));
238 }
239
240 } // namespace
241 } // namespace pw::rpc::test
242