1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc_transport/egress_ingress.h"
16
17 #include <random>
18
19 #include "public/pw_rpc_transport/rpc_transport.h"
20 #include "pw_bytes/span.h"
21 #include "pw_metric/metric.h"
22 #include "pw_rpc/client_server.h"
23 #include "pw_rpc/packet_meta.h"
24 #include "pw_rpc_transport/hdlc_framing.h"
25 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
26 #include "pw_rpc_transport/rpc_transport.h"
27 #include "pw_rpc_transport/service_registry.h"
28 #include "pw_rpc_transport/simple_framing.h"
29 #include "pw_status/status.h"
30 #include "pw_string/string.h"
31 #include "pw_sync/thread_notification.h"
32 #include "pw_unit_test/framework.h"
33
34 namespace pw::rpc {
35 namespace {
36
37 constexpr size_t kMaxPacketSize = 256;
38
39 class TestService final
40 : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
41 TestService> {
42 public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)43 Status Echo(
44 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
45 pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
46 response.msg = request.msg;
47 return OkStatus();
48 }
49 };
50
51 // A transport that stores all received frames so they can be manually retrieved
52 // by the ingress later.
53 class TestTransport : public RpcFrameSender {
54 public:
TestTransport(size_t mtu,bool is_faulty=false)55 explicit TestTransport(size_t mtu, bool is_faulty = false)
56 : mtu_(mtu), is_faulty_(is_faulty) {}
57
MaximumTransmissionUnit() const58 size_t MaximumTransmissionUnit() const override { return mtu_; }
59
Send(RpcFrame frame)60 Status Send(RpcFrame frame) override {
61 if (is_faulty_) {
62 return Status::Internal();
63 }
64 std::copy(
65 frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
66 std::copy(frame.payload.begin(),
67 frame.payload.end(),
68 std::back_inserter(buffer_));
69 return OkStatus();
70 }
71
buffer()72 ByteSpan buffer() { return buffer_; }
73
74 private:
75 size_t mtu_;
76 bool is_faulty_ = false;
77 std::vector<std::byte> buffer_;
78 };
79
80 // An egress handler that passes the received RPC packet to the service
81 // registry.
82 class TestLocalEgress : public RpcEgressHandler {
83 public:
SendRpcPacket(ConstByteSpan packet)84 Status SendRpcPacket(ConstByteSpan packet) override {
85 if (!registry_) {
86 return Status::FailedPrecondition();
87 }
88 return registry_->ProcessRpcPacket(packet);
89 }
90
set_registry(ServiceRegistry & registry)91 void set_registry(ServiceRegistry& registry) { registry_ = ®istry; }
92
93 private:
94 ServiceRegistry* registry_ = nullptr;
95 };
96
TEST(RpcEgressIngressTest,SimpleFramingRoundtrip)97 TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
98 constexpr uint32_t kChannelAtoB = 1;
99 constexpr size_t kMaxMessageLength = 200;
100 constexpr size_t kAtoBMtu = 33;
101 constexpr size_t kBtoAMtu = 72;
102
103 TestTransport transport_a_to_b(kAtoBMtu);
104 TestTransport transport_b_to_a(kBtoAMtu);
105
106 SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
107 SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
108
109 std::array a_tx_channels = {
110 rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
111 std::array b_tx_channels = {
112 rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
113
114 ServiceRegistry registry_a(a_tx_channels);
115 ServiceRegistry registry_b(b_tx_channels);
116
117 TestService test_service;
118 registry_b.RegisterService(test_service);
119
120 TestLocalEgress local_egress_a;
121 local_egress_a.set_registry(registry_a);
122
123 TestLocalEgress local_egress_b;
124 local_egress_b.set_registry(registry_b);
125
126 std::array a_rx_channels = {
127 ChannelEgress{kChannelAtoB, local_egress_a},
128 };
129 std::array b_rx_channels = {
130 ChannelEgress{kChannelAtoB, local_egress_b},
131 };
132
133 SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
134 SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
135
136 auto client =
137 registry_a
138 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
139 kChannelAtoB);
140
141 sync::ThreadNotification receiver1_done;
142 sync::ThreadNotification receiver2_done;
143
144 struct ReceiverState {
145 InlineString<kMaxMessageLength> message;
146 sync::ThreadNotification done;
147 };
148
149 ReceiverState receiver1;
150 ReceiverState receiver2;
151 receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
152 receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
153
154 auto call1 = client.Echo(
155 {.msg = receiver1.message},
156 [&receiver1](
157 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
158 Status status) {
159 EXPECT_EQ(status, OkStatus());
160 EXPECT_EQ(response.msg, receiver1.message);
161 receiver1.done.release();
162 },
163 [&receiver1](Status status) {
164 EXPECT_EQ(status, OkStatus());
165 receiver1.done.release();
166 });
167
168 auto call2 = client.Echo(
169 {.msg = receiver2.message},
170 [&receiver2](
171 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
172 Status status) {
173 EXPECT_EQ(status, OkStatus());
174 EXPECT_EQ(response.msg, receiver2.message);
175 receiver2.done.release();
176 },
177 [&receiver2](Status status) {
178 EXPECT_EQ(status, OkStatus());
179 receiver2.done.release();
180 });
181
182 // Calling `ingress_b.ProcessIncomingData` reads all packets from the
183 // transport and dispatches them according to the ingress configuration.
184 // Dispatching a packet generates a reply message: we then read it back at the
185 // sender by calling `ingress_a.ProcessIncomingData`.
186 EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
187 OkStatus());
188 EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
189 OkStatus());
190
191 receiver1.done.acquire();
192 receiver2.done.acquire();
193 }
194
TEST(RpcEgressIngressTest,HdlcFramingRoundtrip)195 TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
196 constexpr uint32_t kChannelAtoB = 1;
197 constexpr size_t kMaxMessageLength = 200;
198 constexpr size_t kAtoBMtu = 33;
199 constexpr size_t kBtoAMtu = 72;
200
201 TestTransport transport_a_to_b(kAtoBMtu);
202 TestTransport transport_b_to_a(kBtoAMtu);
203
204 HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
205 HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
206
207 std::array a_tx_channels = {
208 rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
209 std::array b_tx_channels = {
210 rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
211
212 ServiceRegistry registry_a(a_tx_channels);
213 ServiceRegistry registry_b(b_tx_channels);
214
215 TestService test_service;
216 registry_b.RegisterService(test_service);
217
218 TestLocalEgress local_egress_a;
219 local_egress_a.set_registry(registry_a);
220
221 TestLocalEgress local_egress_b;
222 local_egress_b.set_registry(registry_b);
223
224 std::array a_rx_channels = {
225 ChannelEgress{kChannelAtoB, local_egress_a},
226 };
227 std::array b_rx_channels = {
228 ChannelEgress{kChannelAtoB, local_egress_b},
229 };
230
231 HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
232 HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
233
234 auto client =
235 registry_a
236 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
237 kChannelAtoB);
238
239 sync::ThreadNotification receiver1_done;
240 sync::ThreadNotification receiver2_done;
241
242 struct ReceiverState {
243 InlineString<kMaxMessageLength> message;
244 sync::ThreadNotification done;
245 };
246
247 ReceiverState receiver1;
248 ReceiverState receiver2;
249 receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
250 receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
251
252 auto call1 = client.Echo(
253 {.msg = receiver1.message},
254 [&receiver1](
255 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
256 Status status) {
257 EXPECT_EQ(status, OkStatus());
258 EXPECT_EQ(response.msg, receiver1.message);
259 receiver1.done.release();
260 },
261 [&receiver1](Status status) {
262 EXPECT_EQ(status, OkStatus());
263 receiver1.done.release();
264 });
265
266 auto call2 = client.Echo(
267 {.msg = receiver2.message},
268 [&receiver2](
269 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
270 Status status) {
271 EXPECT_EQ(status, OkStatus());
272 EXPECT_EQ(response.msg, receiver2.message);
273 receiver2.done.release();
274 },
275 [&receiver2](Status status) {
276 EXPECT_EQ(status, OkStatus());
277 receiver2.done.release();
278 });
279
280 // Calling `ingress_b.ProcessIncomingData` reads all packets from the
281 // transport and dispatches them according to the ingress configuration.
282 // Dispatching a packet generates a reply message: we then read it back at the
283 // sender by calling `ingress_a.ProcessIncomingData`.
284 EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
285 OkStatus());
286 EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
287 OkStatus());
288 EXPECT_EQ(ingress_a.num_total_packets(), 2u);
289 EXPECT_EQ(ingress_b.num_total_packets(), 2u);
290
291 receiver1.done.acquire();
292 receiver2.done.acquire();
293 }
294
TEST(RpcEgressIngressTest,MalformedRpcPacket)295 TEST(RpcEgressIngressTest, MalformedRpcPacket) {
296 constexpr uint32_t kTestChannel = 1;
297 constexpr size_t kMtu = 33;
298 std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
299
300 TestTransport transport(kMtu);
301 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
302
303 TestLocalEgress local_egress;
304 std::array rx_channels = {
305 ChannelEgress{kTestChannel, local_egress},
306 };
307
308 SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
309
310 EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
311 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
312
313 EXPECT_EQ(ingress.num_total_packets(), 1u);
314 EXPECT_EQ(ingress.num_bad_packets(), 1u);
315 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
316 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
317 EXPECT_EQ(ingress.num_egress_errors(), 0u);
318 }
319
TEST(RpcEgressIngressTest,ChannelIdOverflow)320 TEST(RpcEgressIngressTest, ChannelIdOverflow) {
321 constexpr uint32_t kInvalidChannelId = 65;
322 constexpr size_t kMtu = 128;
323
324 TestTransport transport(kMtu);
325 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
326
327 std::array sender_tx_channels = {
328 rpc::Channel::Create<kInvalidChannelId>(&egress)};
329
330 ServiceRegistry registry(sender_tx_channels);
331 auto client =
332 registry
333 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
334 kInvalidChannelId);
335
336 SimpleRpcIngress<kMaxPacketSize> ingress;
337
338 auto receiver = client.Echo({.msg = "test"});
339
340 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
341
342 EXPECT_EQ(ingress.num_total_packets(), 1u);
343 EXPECT_EQ(ingress.num_bad_packets(), 0u);
344 EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
345 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
346 EXPECT_EQ(ingress.num_egress_errors(), 0u);
347 }
348
TEST(RpcEgressIngressTest,MissingEgressForIncomingPacket)349 TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
350 constexpr uint32_t kChannelA = 22;
351 constexpr uint32_t kChannelB = 33;
352 constexpr size_t kMtu = 128;
353
354 TestTransport transport(kMtu);
355 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
356
357 std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
358
359 ServiceRegistry registry(sender_tx_channels);
360 auto client =
361 registry
362 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
363 kChannelA);
364
365 std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
366 SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
367
368 auto receiver = client.Echo({.msg = "test"});
369
370 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
371
372 EXPECT_EQ(ingress.num_total_packets(), 1u);
373 EXPECT_EQ(ingress.num_bad_packets(), 0u);
374 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
375 EXPECT_EQ(ingress.num_missing_egresses(), 1u);
376 EXPECT_EQ(ingress.num_egress_errors(), 0u);
377 }
378
TEST(RpcEgressIngressTest,EgressSendFailureForIncomingPacket)379 TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
380 constexpr uint32_t kChannelId = 22;
381 constexpr size_t kMtu = 128;
382
383 TestTransport good_transport(kMtu, /*is_faulty=*/false);
384 TestTransport bad_transport(kMtu, /*is_faulty=*/true);
385 SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
386 SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
387
388 std::array sender_tx_channels = {
389 rpc::Channel::Create<kChannelId>(&good_egress)};
390
391 ServiceRegistry registry(sender_tx_channels);
392 auto client =
393 registry
394 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
395 kChannelId);
396
397 std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
398 SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
399
400 auto receiver = client.Echo({.msg = "test"});
401
402 EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
403
404 EXPECT_EQ(ingress.num_total_packets(), 1u);
405 EXPECT_EQ(ingress.num_bad_packets(), 0u);
406 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
407 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
408 EXPECT_EQ(ingress.num_egress_errors(), 1u);
409 }
410
411 } // namespace
412 } // namespace pw::rpc
413