1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <sys/types.h> 17 18 #include <mutex> 19 20 #include "pw_bytes/span.h" 21 #include "pw_metric/metric.h" 22 #include "pw_rpc/channel.h" 23 #include "pw_rpc/packet_meta.h" 24 #include "pw_rpc_transport/hdlc_framing.h" 25 #include "pw_rpc_transport/rpc_transport.h" 26 #include "pw_rpc_transport/simple_framing.h" 27 #include "pw_status/status.h" 28 #include "pw_sync/lock_annotations.h" 29 #include "pw_sync/mutex.h" 30 31 namespace pw::rpc { 32 namespace internal { 33 34 void LogBadPacket(); 35 void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id); 36 void LogMissingEgressForChannel(uint32_t channel_id); 37 void LogIngressSendFailure(uint32_t channel_id, Status status); 38 39 } // namespace internal 40 41 // Ties RPC transport and RPC frame encoder together. 42 template <typename Encoder> 43 class RpcEgress : public RpcEgressHandler, public ChannelOutput { 44 public: RpcEgress(std::string_view channel_name,RpcFrameSender & transport)45 RpcEgress(std::string_view channel_name, RpcFrameSender& transport) 46 : ChannelOutput(channel_name.data()), transport_(transport) {} 47 48 // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the 49 // provided packet using the target transport's MTU as max frame size and 50 // sends it over that transport. 51 // 52 // Sending a packet may result in multiple RpcTransport::Write calls which 53 // must not be interleaved in order for the packet to be successfully 54 // reassembled from the transport-level frames by the receiver. RpcEgress 55 // is using a mutex to ensure this. Technically we could just rely on pw_rpc 56 // global lock but that would unnecessarily couple transport logic to pw_rpc 57 // internals. SendRpcPacket(ConstByteSpan rpc_packet)58 Status SendRpcPacket(ConstByteSpan rpc_packet) override { 59 std::lock_guard lock(mutex_); 60 return encoder_.Encode(rpc_packet, 61 transport_.MaximumTransmissionUnit(), 62 [this](RpcFrame& frame) { 63 // Encoders must call this callback inline so that 64 // we're still holding `mutex_` here. Unfortunately 65 // the lock annotations cannot be used on 66 // `transport_` to enforce this. 67 return transport_.Send(frame); 68 }); 69 } 70 71 // Implements ChannelOutput. Send(ConstByteSpan buffer)72 Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); } 73 74 private: 75 sync::Mutex mutex_; 76 RpcFrameSender& transport_; 77 Encoder encoder_ PW_GUARDED_BY(mutex_); 78 }; 79 80 // Ties a channel id and the egress that packets on that channel should be sent 81 // to. 82 struct ChannelEgress { ChannelEgressChannelEgress83 ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler) 84 : channel_id(id), egress(&egress_handler) {} 85 86 const uint32_t channel_id; 87 RpcEgressHandler* const egress = nullptr; 88 }; 89 90 // Handler for incoming RPC packets. RpcIngress is not thread-safe and must be 91 // accessed from a single thread (typically the RPC RX thread). 92 template <typename Decoder> 93 class RpcIngress : public RpcIngressHandler { 94 public: 95 static constexpr size_t kMaxChannelId = 64; 96 RpcIngress() = default; 97 RpcIngress(span<ChannelEgress> channel_egresses)98 explicit RpcIngress(span<ChannelEgress> channel_egresses) { 99 for (auto& channel : channel_egresses) { 100 PW_ASSERT(channel.channel_id <= kMaxChannelId); 101 channel_egresses_[channel.channel_id] = channel.egress; 102 } 103 } 104 metrics()105 const metric::Group& metrics() const { return metrics_; } 106 num_total_packets()107 uint32_t num_total_packets() const { return total_packets_.value(); } 108 num_bad_packets()109 uint32_t num_bad_packets() const { return bad_packets_.value(); } 110 num_overflow_channel_ids()111 uint32_t num_overflow_channel_ids() const { 112 return overflow_channel_ids_.value(); 113 } 114 num_missing_egresses()115 uint32_t num_missing_egresses() const { return missing_egresses_.value(); } 116 num_egress_errors()117 uint32_t num_egress_errors() const { return egress_errors_.value(); } 118 119 // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each 120 // packet and sends the packet to the egress registered for that channel. ProcessIncomingData(ConstByteSpan buffer)121 Status ProcessIncomingData(ConstByteSpan buffer) override { 122 return decoder_.Decode(buffer, [this](ConstByteSpan packet) { 123 const auto packet_meta = rpc::PacketMeta::FromBuffer(packet); 124 total_packets_.Increment(); 125 if (!packet_meta.ok()) { 126 bad_packets_.Increment(); 127 internal::LogBadPacket(); 128 return; 129 } 130 if (packet_meta->channel_id() > kMaxChannelId) { 131 overflow_channel_ids_.Increment(); 132 internal::LogChannelIdOverflow(packet_meta->channel_id(), 133 kMaxChannelId); 134 return; 135 } 136 auto* egress = channel_egresses_[packet_meta->channel_id()]; 137 if (egress == nullptr) { 138 missing_egresses_.Increment(); 139 internal::LogMissingEgressForChannel(packet_meta->channel_id()); 140 return; 141 } 142 const auto status = egress->SendRpcPacket(packet); 143 if (!status.ok()) { 144 egress_errors_.Increment(); 145 internal::LogIngressSendFailure(packet_meta->channel_id(), status); 146 } 147 }); 148 } 149 150 private: 151 std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{}; 152 Decoder decoder_; 153 PW_METRIC_GROUP(metrics_, "pw_rpc_transport"); 154 PW_METRIC(metrics_, total_packets_, "total_packets", 0u); 155 PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u); 156 PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u); 157 PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u); 158 PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u); 159 }; 160 161 template <size_t kMaxPacketSize> 162 using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>; 163 164 template <size_t kMaxPacketSize> 165 using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>; 166 167 template <size_t kMaxPacketSize> 168 using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>; 169 170 template <size_t kMaxPacketSize> 171 using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>; 172 173 } // namespace pw::rpc 174