1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <mutex> 17 18 #include "pw_bytes/span.h" 19 #include "pw_containers/intrusive_list.h" 20 #include "pw_result/result.h" 21 #include "pw_span/span.h" 22 #include "pw_status/status.h" 23 #include "pw_sync/lock_annotations.h" 24 #include "pw_sync/mutex.h" 25 26 namespace pw::rpc::internal { 27 28 // A simple thread-safe FIFO for queueing packets. Used by LocalRpcEgress to 29 // decouple receiving locally-destined RPC packets from their processing. 30 template <size_t kMaxPacketSize> 31 class PacketBufferQueue { 32 public: 33 class PacketBuffer : public IntrusiveList<PacketBuffer>::Item { 34 public: CopyPacket(ConstByteSpan packet)35 Status CopyPacket(ConstByteSpan packet) { 36 if (packet.size() > buffer_.size()) { 37 return Status::ResourceExhausted(); 38 } 39 std::copy(packet.begin(), packet.end(), buffer_.begin()); 40 size_ = packet.size(); 41 return OkStatus(); 42 } 43 GetPacket()44 Result<ConstByteSpan> GetPacket() { 45 auto buffer_span = span(buffer_); 46 return buffer_span.first(size_); 47 } 48 49 protected: 50 friend PacketBufferQueue; 51 52 private: 53 std::array<std::byte, kMaxPacketSize> buffer_ = {}; 54 size_t size_ = 0; 55 }; 56 57 PacketBufferQueue() = default; PacketBufferQueue(span<PacketBuffer> packets)58 explicit PacketBufferQueue(span<PacketBuffer> packets) { 59 for (auto& packet : packets) { 60 packet_list_.push_back(packet); 61 } 62 } 63 64 // Push a packet to the end of the queue. Push(PacketBuffer & packet)65 void Push(PacketBuffer& packet) { 66 const LockGuard guard(lock_); 67 packet_list_.push_back(packet); 68 } 69 70 // Pop a packet from the head of the queue. 71 // Returns a pointer to the packet popped from the queue, or 72 // ResourceExhausted() if the queue is empty. Pop()73 Result<PacketBuffer*> Pop() { 74 const LockGuard lock(lock_); 75 76 if (packet_list_.empty()) { 77 return Status::ResourceExhausted(); 78 } 79 80 // Return the first available packet in the list. 81 PacketBufferQueue::PacketBuffer& front = packet_list_.front(); 82 packet_list_.pop_front(); 83 return &front; 84 } 85 86 private: 87 using LockGuard = ::std::lock_guard<sync::Mutex>; 88 sync::Mutex lock_; 89 IntrusiveList<PacketBuffer> packet_list_ PW_GUARDED_BY(lock_); 90 }; 91 92 } // namespace pw::rpc::internal 93