xref: /aosp_15_r20/external/pigweed/pw_rpc_transport/public/pw_rpc_transport/egress_ingress.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <sys/types.h>
17 
18 #include <mutex>
19 
20 #include "pw_bytes/span.h"
21 #include "pw_metric/metric.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/packet_meta.h"
24 #include "pw_rpc_transport/hdlc_framing.h"
25 #include "pw_rpc_transport/rpc_transport.h"
26 #include "pw_rpc_transport/simple_framing.h"
27 #include "pw_status/status.h"
28 #include "pw_sync/lock_annotations.h"
29 #include "pw_sync/mutex.h"
30 
31 namespace pw::rpc {
32 namespace internal {
33 
34 void LogBadPacket();
35 void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id);
36 void LogMissingEgressForChannel(uint32_t channel_id);
37 void LogIngressSendFailure(uint32_t channel_id, Status status);
38 
39 }  // namespace internal
40 
41 // Ties RPC transport and RPC frame encoder together.
42 template <typename Encoder>
43 class RpcEgress : public RpcEgressHandler, public ChannelOutput {
44  public:
RpcEgress(std::string_view channel_name,RpcFrameSender & transport)45   RpcEgress(std::string_view channel_name, RpcFrameSender& transport)
46       : ChannelOutput(channel_name.data()), transport_(transport) {}
47 
48   // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the
49   // provided packet using the target transport's MTU as max frame size and
50   // sends it over that transport.
51   //
52   // Sending a packet may result in multiple RpcTransport::Write calls which
53   // must not be interleaved in order for the packet to be successfully
54   // reassembled from the transport-level frames by the receiver. RpcEgress
55   // is using a mutex to ensure this. Technically we could just rely on pw_rpc
56   // global lock but that would unnecessarily couple transport logic to pw_rpc
57   // internals.
SendRpcPacket(ConstByteSpan rpc_packet)58   Status SendRpcPacket(ConstByteSpan rpc_packet) override {
59     std::lock_guard lock(mutex_);
60     return encoder_.Encode(rpc_packet,
61                            transport_.MaximumTransmissionUnit(),
62                            [this](RpcFrame& frame) {
63                              // Encoders must call this callback inline so that
64                              // we're still holding `mutex_` here. Unfortunately
65                              // the lock annotations cannot be used on
66                              // `transport_` to enforce this.
67                              return transport_.Send(frame);
68                            });
69   }
70 
71   // Implements ChannelOutput.
Send(ConstByteSpan buffer)72   Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); }
73 
74  private:
75   sync::Mutex mutex_;
76   RpcFrameSender& transport_;
77   Encoder encoder_ PW_GUARDED_BY(mutex_);
78 };
79 
80 // Ties a channel id and the egress that packets on that channel should be sent
81 // to.
82 struct ChannelEgress {
ChannelEgressChannelEgress83   ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler)
84       : channel_id(id), egress(&egress_handler) {}
85 
86   const uint32_t channel_id;
87   RpcEgressHandler* const egress = nullptr;
88 };
89 
90 // Handler for incoming RPC packets. RpcIngress is not thread-safe and must be
91 // accessed from a single thread (typically the RPC RX thread).
92 template <typename Decoder>
93 class RpcIngress : public RpcIngressHandler {
94  public:
95   static constexpr size_t kMaxChannelId = 64;
96   RpcIngress() = default;
97 
RpcIngress(span<ChannelEgress> channel_egresses)98   explicit RpcIngress(span<ChannelEgress> channel_egresses) {
99     for (auto& channel : channel_egresses) {
100       PW_ASSERT(channel.channel_id <= kMaxChannelId);
101       channel_egresses_[channel.channel_id] = channel.egress;
102     }
103   }
104 
metrics()105   const metric::Group& metrics() const { return metrics_; }
106 
num_total_packets()107   uint32_t num_total_packets() const { return total_packets_.value(); }
108 
num_bad_packets()109   uint32_t num_bad_packets() const { return bad_packets_.value(); }
110 
num_overflow_channel_ids()111   uint32_t num_overflow_channel_ids() const {
112     return overflow_channel_ids_.value();
113   }
114 
num_missing_egresses()115   uint32_t num_missing_egresses() const { return missing_egresses_.value(); }
116 
num_egress_errors()117   uint32_t num_egress_errors() const { return egress_errors_.value(); }
118 
119   // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each
120   // packet and sends the packet to the egress registered for that channel.
ProcessIncomingData(ConstByteSpan buffer)121   Status ProcessIncomingData(ConstByteSpan buffer) override {
122     return decoder_.Decode(buffer, [this](ConstByteSpan packet) {
123       const auto packet_meta = rpc::PacketMeta::FromBuffer(packet);
124       total_packets_.Increment();
125       if (!packet_meta.ok()) {
126         bad_packets_.Increment();
127         internal::LogBadPacket();
128         return;
129       }
130       if (packet_meta->channel_id() > kMaxChannelId) {
131         overflow_channel_ids_.Increment();
132         internal::LogChannelIdOverflow(packet_meta->channel_id(),
133                                        kMaxChannelId);
134         return;
135       }
136       auto* egress = channel_egresses_[packet_meta->channel_id()];
137       if (egress == nullptr) {
138         missing_egresses_.Increment();
139         internal::LogMissingEgressForChannel(packet_meta->channel_id());
140         return;
141       }
142       const auto status = egress->SendRpcPacket(packet);
143       if (!status.ok()) {
144         egress_errors_.Increment();
145         internal::LogIngressSendFailure(packet_meta->channel_id(), status);
146       }
147     });
148   }
149 
150  private:
151   std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{};
152   Decoder decoder_;
153   PW_METRIC_GROUP(metrics_, "pw_rpc_transport");
154   PW_METRIC(metrics_, total_packets_, "total_packets", 0u);
155   PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u);
156   PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u);
157   PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u);
158   PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u);
159 };
160 
161 template <size_t kMaxPacketSize>
162 using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>;
163 
164 template <size_t kMaxPacketSize>
165 using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>;
166 
167 template <size_t kMaxPacketSize>
168 using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>;
169 
170 template <size_t kMaxPacketSize>
171 using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>;
172 
173 }  // namespace pw::rpc
174