xref: /aosp_15_r20/external/pigweed/pw_rpc_transport/egress_ingress_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc_transport/egress_ingress.h"
16 
17 #include <random>
18 
19 #include "public/pw_rpc_transport/rpc_transport.h"
20 #include "pw_bytes/span.h"
21 #include "pw_metric/metric.h"
22 #include "pw_rpc/client_server.h"
23 #include "pw_rpc/packet_meta.h"
24 #include "pw_rpc_transport/hdlc_framing.h"
25 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
26 #include "pw_rpc_transport/rpc_transport.h"
27 #include "pw_rpc_transport/service_registry.h"
28 #include "pw_rpc_transport/simple_framing.h"
29 #include "pw_status/status.h"
30 #include "pw_string/string.h"
31 #include "pw_sync/thread_notification.h"
32 #include "pw_unit_test/framework.h"
33 
34 namespace pw::rpc {
35 namespace {
36 
37 constexpr size_t kMaxPacketSize = 256;
38 
39 class TestService final
40     : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
41           TestService> {
42  public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)43   Status Echo(
44       const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
45       pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
46     response.msg = request.msg;
47     return OkStatus();
48   }
49 };
50 
51 // A transport that stores all received frames so they can be manually retrieved
52 // by the ingress later.
53 class TestTransport : public RpcFrameSender {
54  public:
TestTransport(size_t mtu,bool is_faulty=false)55   explicit TestTransport(size_t mtu, bool is_faulty = false)
56       : mtu_(mtu), is_faulty_(is_faulty) {}
57 
MaximumTransmissionUnit() const58   size_t MaximumTransmissionUnit() const override { return mtu_; }
59 
Send(RpcFrame frame)60   Status Send(RpcFrame frame) override {
61     if (is_faulty_) {
62       return Status::Internal();
63     }
64     std::copy(
65         frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
66     std::copy(frame.payload.begin(),
67               frame.payload.end(),
68               std::back_inserter(buffer_));
69     return OkStatus();
70   }
71 
buffer()72   ByteSpan buffer() { return buffer_; }
73 
74  private:
75   size_t mtu_;
76   bool is_faulty_ = false;
77   std::vector<std::byte> buffer_;
78 };
79 
80 // An egress handler that passes the received RPC packet to the service
81 // registry.
82 class TestLocalEgress : public RpcEgressHandler {
83  public:
SendRpcPacket(ConstByteSpan packet)84   Status SendRpcPacket(ConstByteSpan packet) override {
85     if (!registry_) {
86       return Status::FailedPrecondition();
87     }
88     return registry_->ProcessRpcPacket(packet);
89   }
90 
set_registry(ServiceRegistry & registry)91   void set_registry(ServiceRegistry& registry) { registry_ = &registry; }
92 
93  private:
94   ServiceRegistry* registry_ = nullptr;
95 };
96 
TEST(RpcEgressIngressTest,SimpleFramingRoundtrip)97 TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
98   constexpr uint32_t kChannelAtoB = 1;
99   constexpr size_t kMaxMessageLength = 200;
100   constexpr size_t kAtoBMtu = 33;
101   constexpr size_t kBtoAMtu = 72;
102 
103   TestTransport transport_a_to_b(kAtoBMtu);
104   TestTransport transport_b_to_a(kBtoAMtu);
105 
106   SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
107   SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
108 
109   std::array a_tx_channels = {
110       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
111   std::array b_tx_channels = {
112       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
113 
114   ServiceRegistry registry_a(a_tx_channels);
115   ServiceRegistry registry_b(b_tx_channels);
116 
117   TestService test_service;
118   registry_b.RegisterService(test_service);
119 
120   TestLocalEgress local_egress_a;
121   local_egress_a.set_registry(registry_a);
122 
123   TestLocalEgress local_egress_b;
124   local_egress_b.set_registry(registry_b);
125 
126   std::array a_rx_channels = {
127       ChannelEgress{kChannelAtoB, local_egress_a},
128   };
129   std::array b_rx_channels = {
130       ChannelEgress{kChannelAtoB, local_egress_b},
131   };
132 
133   SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
134   SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
135 
136   auto client =
137       registry_a
138           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
139               kChannelAtoB);
140 
141   sync::ThreadNotification receiver1_done;
142   sync::ThreadNotification receiver2_done;
143 
144   struct ReceiverState {
145     InlineString<kMaxMessageLength> message;
146     sync::ThreadNotification done;
147   };
148 
149   ReceiverState receiver1;
150   ReceiverState receiver2;
151   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
152   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
153 
154   auto call1 = client.Echo(
155       {.msg = receiver1.message},
156       [&receiver1](
157           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
158           Status status) {
159         EXPECT_EQ(status, OkStatus());
160         EXPECT_EQ(response.msg, receiver1.message);
161         receiver1.done.release();
162       },
163       [&receiver1](Status status) {
164         EXPECT_EQ(status, OkStatus());
165         receiver1.done.release();
166       });
167 
168   auto call2 = client.Echo(
169       {.msg = receiver2.message},
170       [&receiver2](
171           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
172           Status status) {
173         EXPECT_EQ(status, OkStatus());
174         EXPECT_EQ(response.msg, receiver2.message);
175         receiver2.done.release();
176       },
177       [&receiver2](Status status) {
178         EXPECT_EQ(status, OkStatus());
179         receiver2.done.release();
180       });
181 
182   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
183   // transport and dispatches them according to the ingress configuration.
184   // Dispatching a packet generates a reply message: we then read it back at the
185   // sender by calling `ingress_a.ProcessIncomingData`.
186   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
187             OkStatus());
188   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
189             OkStatus());
190 
191   receiver1.done.acquire();
192   receiver2.done.acquire();
193 }
194 
TEST(RpcEgressIngressTest,HdlcFramingRoundtrip)195 TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
196   constexpr uint32_t kChannelAtoB = 1;
197   constexpr size_t kMaxMessageLength = 200;
198   constexpr size_t kAtoBMtu = 33;
199   constexpr size_t kBtoAMtu = 72;
200 
201   TestTransport transport_a_to_b(kAtoBMtu);
202   TestTransport transport_b_to_a(kBtoAMtu);
203 
204   HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
205   HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
206 
207   std::array a_tx_channels = {
208       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
209   std::array b_tx_channels = {
210       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
211 
212   ServiceRegistry registry_a(a_tx_channels);
213   ServiceRegistry registry_b(b_tx_channels);
214 
215   TestService test_service;
216   registry_b.RegisterService(test_service);
217 
218   TestLocalEgress local_egress_a;
219   local_egress_a.set_registry(registry_a);
220 
221   TestLocalEgress local_egress_b;
222   local_egress_b.set_registry(registry_b);
223 
224   std::array a_rx_channels = {
225       ChannelEgress{kChannelAtoB, local_egress_a},
226   };
227   std::array b_rx_channels = {
228       ChannelEgress{kChannelAtoB, local_egress_b},
229   };
230 
231   HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
232   HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
233 
234   auto client =
235       registry_a
236           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
237               kChannelAtoB);
238 
239   sync::ThreadNotification receiver1_done;
240   sync::ThreadNotification receiver2_done;
241 
242   struct ReceiverState {
243     InlineString<kMaxMessageLength> message;
244     sync::ThreadNotification done;
245   };
246 
247   ReceiverState receiver1;
248   ReceiverState receiver2;
249   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
250   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
251 
252   auto call1 = client.Echo(
253       {.msg = receiver1.message},
254       [&receiver1](
255           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
256           Status status) {
257         EXPECT_EQ(status, OkStatus());
258         EXPECT_EQ(response.msg, receiver1.message);
259         receiver1.done.release();
260       },
261       [&receiver1](Status status) {
262         EXPECT_EQ(status, OkStatus());
263         receiver1.done.release();
264       });
265 
266   auto call2 = client.Echo(
267       {.msg = receiver2.message},
268       [&receiver2](
269           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
270           Status status) {
271         EXPECT_EQ(status, OkStatus());
272         EXPECT_EQ(response.msg, receiver2.message);
273         receiver2.done.release();
274       },
275       [&receiver2](Status status) {
276         EXPECT_EQ(status, OkStatus());
277         receiver2.done.release();
278       });
279 
280   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
281   // transport and dispatches them according to the ingress configuration.
282   // Dispatching a packet generates a reply message: we then read it back at the
283   // sender by calling `ingress_a.ProcessIncomingData`.
284   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
285             OkStatus());
286   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
287             OkStatus());
288   EXPECT_EQ(ingress_a.num_total_packets(), 2u);
289   EXPECT_EQ(ingress_b.num_total_packets(), 2u);
290 
291   receiver1.done.acquire();
292   receiver2.done.acquire();
293 }
294 
TEST(RpcEgressIngressTest,MalformedRpcPacket)295 TEST(RpcEgressIngressTest, MalformedRpcPacket) {
296   constexpr uint32_t kTestChannel = 1;
297   constexpr size_t kMtu = 33;
298   std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
299 
300   TestTransport transport(kMtu);
301   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
302 
303   TestLocalEgress local_egress;
304   std::array rx_channels = {
305       ChannelEgress{kTestChannel, local_egress},
306   };
307 
308   SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
309 
310   EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
311   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
312 
313   EXPECT_EQ(ingress.num_total_packets(), 1u);
314   EXPECT_EQ(ingress.num_bad_packets(), 1u);
315   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
316   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
317   EXPECT_EQ(ingress.num_egress_errors(), 0u);
318 }
319 
TEST(RpcEgressIngressTest,ChannelIdOverflow)320 TEST(RpcEgressIngressTest, ChannelIdOverflow) {
321   constexpr uint32_t kInvalidChannelId = 65;
322   constexpr size_t kMtu = 128;
323 
324   TestTransport transport(kMtu);
325   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
326 
327   std::array sender_tx_channels = {
328       rpc::Channel::Create<kInvalidChannelId>(&egress)};
329 
330   ServiceRegistry registry(sender_tx_channels);
331   auto client =
332       registry
333           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
334               kInvalidChannelId);
335 
336   SimpleRpcIngress<kMaxPacketSize> ingress;
337 
338   auto receiver = client.Echo({.msg = "test"});
339 
340   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
341 
342   EXPECT_EQ(ingress.num_total_packets(), 1u);
343   EXPECT_EQ(ingress.num_bad_packets(), 0u);
344   EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
345   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
346   EXPECT_EQ(ingress.num_egress_errors(), 0u);
347 }
348 
TEST(RpcEgressIngressTest,MissingEgressForIncomingPacket)349 TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
350   constexpr uint32_t kChannelA = 22;
351   constexpr uint32_t kChannelB = 33;
352   constexpr size_t kMtu = 128;
353 
354   TestTransport transport(kMtu);
355   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
356 
357   std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
358 
359   ServiceRegistry registry(sender_tx_channels);
360   auto client =
361       registry
362           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
363               kChannelA);
364 
365   std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
366   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
367 
368   auto receiver = client.Echo({.msg = "test"});
369 
370   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
371 
372   EXPECT_EQ(ingress.num_total_packets(), 1u);
373   EXPECT_EQ(ingress.num_bad_packets(), 0u);
374   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
375   EXPECT_EQ(ingress.num_missing_egresses(), 1u);
376   EXPECT_EQ(ingress.num_egress_errors(), 0u);
377 }
378 
TEST(RpcEgressIngressTest,EgressSendFailureForIncomingPacket)379 TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
380   constexpr uint32_t kChannelId = 22;
381   constexpr size_t kMtu = 128;
382 
383   TestTransport good_transport(kMtu, /*is_faulty=*/false);
384   TestTransport bad_transport(kMtu, /*is_faulty=*/true);
385   SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
386   SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
387 
388   std::array sender_tx_channels = {
389       rpc::Channel::Create<kChannelId>(&good_egress)};
390 
391   ServiceRegistry registry(sender_tx_channels);
392   auto client =
393       registry
394           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
395               kChannelId);
396 
397   std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
398   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
399 
400   auto receiver = client.Echo({.msg = "test"});
401 
402   EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
403 
404   EXPECT_EQ(ingress.num_total_packets(), 1u);
405   EXPECT_EQ(ingress.num_bad_packets(), 0u);
406   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
407   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
408   EXPECT_EQ(ingress.num_egress_errors(), 1u);
409 }
410 
411 }  // namespace
412 }  // namespace pw::rpc
413