xref: /aosp_15_r20/external/pigweed/pw_system/async_packet_io.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_system/internal/async_packet_io.h"
16 
17 #include "pw_assert/check.h"
18 #include "pw_log/log.h"
19 #include "pw_system/config.h"
20 #include "pw_thread/detached_thread.h"
21 
22 // Normal logging is not possible here. This code processes log messages, so
23 // must not produce logs for each log.
24 #define PACKET_IO_DEBUG_LOG(msg, ...)                          \
25   if (false) /* Set to true to enable printf debug logging. */ \
26   printf("DEBUG LOG: " msg "\n" __VA_OPT__(, ) __VA_ARGS__)
27 
28 namespace pw::system::internal {
29 
30 using ::pw::async2::Context;
31 using ::pw::async2::Dispatcher;
32 using ::pw::async2::Pending;
33 using ::pw::async2::Poll;
34 using ::pw::async2::Ready;
35 using ::pw::multibuf::MultiBuf;
36 
37 // With atomic head/tail reads/writes, this type of queue interaction could be
38 // lockless in single producer, single consumer scenarios.
39 
40 // TODO: b/349398108 - MultiBuf directly out of (and into) the ring buffer.
41 Poll<InlineVarLenEntryQueue<>::Entry>
PendOutgoingDatagram(Context & cx)42 RpcChannelOutputQueue::PendOutgoingDatagram(Context& cx) {
43   // The head pointer will not change until Pop is called.
44   std::lock_guard lock(mutex_);
45   if (queue_.empty()) {
46     PW_ASYNC_STORE_WAKER(
47         cx,
48         packet_ready_,
49         "RpcChannel is waiting for outgoing RPC datagrams to be enqueued");
50     return Pending();
51   }
52   return Ready(queue_.front());
53 }
54 
Send(ConstByteSpan datagram)55 Status RpcChannelOutputQueue::Send(ConstByteSpan datagram) {
56   PACKET_IO_DEBUG_LOG("Pushing %zu B packet into outbound queue",
57                       datagram.size());
58   mutex_.lock();
59   if (queue_.try_push(datagram)) {
60     std::move(packet_ready_).Wake();
61   } else {
62     dropped_packets_ += 1;
63   }
64   mutex_.unlock();
65   return OkStatus();
66 }
67 
RpcServerThread(Allocator & allocator,rpc::Server & server)68 RpcServerThread::RpcServerThread(Allocator& allocator, rpc::Server& server)
69     : allocator_(allocator), rpc_server_(server) {
70   PW_CHECK_OK(rpc_server_.OpenChannel(1, rpc_packet_queue_));
71 }
72 
PushPacket(MultiBuf && packet)73 void RpcServerThread::PushPacket(MultiBuf&& packet) {
74   PACKET_IO_DEBUG_LOG("Received %zu B RPC packet", packet.size());
75   std::lock_guard lock(mutex_);
76   ready_for_packet_ = false;
77   packet_multibuf_ = std::move(packet);
78   new_packet_available_.release();
79 }
80 
RunOnce()81 void RpcServerThread::RunOnce() {
82   new_packet_available_.acquire();
83 
84   std::optional<ConstByteSpan> span = packet_multibuf_.ContiguousSpan();
85   if (span.has_value()) {
86     rpc_server_.ProcessPacket(*span).IgnoreError();
87   } else {
88     // Copy the packet into a contiguous buffer.
89     // TODO: b/349440355 - Consider a global buffer instead of repeated allocs.
90     const size_t packet_size = packet_multibuf_.size();
91     std::byte* buffer = static_cast<std::byte*>(
92         allocator_.Allocate({packet_size, alignof(std::byte)}));
93 
94     auto copy_result = packet_multibuf_.CopyTo({buffer, packet_size});
95     PW_DCHECK_OK(copy_result.status());
96     rpc_server_.ProcessPacket({buffer, packet_size}).IgnoreError();
97 
98     allocator_.Deallocate(buffer);
99   }
100 
101   packet_multibuf_.Release();
102   std::lock_guard lock(mutex_);
103   ready_for_packet_ = true;
104   std::move(ready_to_receive_packet_).Wake();
105 }
106 
PacketIO(channel::ByteReaderWriter & io_channel,ByteSpan buffer,Allocator & allocator,rpc::Server & rpc_server)107 PacketIO::PacketIO(channel::ByteReaderWriter& io_channel,
108                    ByteSpan buffer,
109                    Allocator& allocator,
110                    rpc::Server& rpc_server)
111     : allocator_(allocator),
112       mb_allocator_1_(mb_allocator_buffer_1_, allocator_),
113       mb_allocator_2_(mb_allocator_buffer_2_, allocator_),
114       channels_(mb_allocator_1_, mb_allocator_2_),
115       router_(io_channel, buffer),
116       rpc_server_thread_(allocator_, rpc_server),
117       packet_reader_(*this),
118       packet_writer_(*this) {
119   PW_CHECK_OK(router_.AddChannel(channels_.second(),
120                                  PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS,
121                                  PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS));
122 }
123 
Start(Dispatcher & dispatcher,const thread::Options & thread_options)124 void PacketIO::Start(Dispatcher& dispatcher,
125                      const thread::Options& thread_options) {
126   dispatcher.Post(packet_reader_);
127   dispatcher.Post(packet_writer_);
128 
129   thread::DetachedThread(thread_options, [this] {
130     while (true) {
131       rpc_server_thread_.RunOnce();
132     }
133   });
134 }
135 
DoPend(Context & cx)136 Poll<> PacketIO::PacketReader::DoPend(Context& cx) {
137   // Let the router do its work.
138   if (io_.router_.Pend(cx).IsReady()) {
139     return Ready();  // channel is closed, we're done here
140   }
141 
142   // If the dispatcher isn't ready for another packet, wait.
143   if (io_.rpc_server_thread_.PendReadyForPacket(cx).IsPending()) {
144     return Pending();  // Nothing else to do for now
145   }
146 
147   // Read a packet from the router and provide it to the RPC thread.
148   auto read = io_.channel().PendRead(cx);
149   if (read.IsPending()) {
150     return Pending();  // Nothing else to do for now
151   }
152   if (!read->ok()) {
153     PW_LOG_ERROR("Channel::PendRead() returned status %s",
154                  read->status().str());
155     return Ready();  // Channel is broken
156   }
157   // Push the packet into the RPC thread.
158   io_.rpc_server_thread_.PushPacket(*std::move(*read));
159   return Pending();  // Nothing else to do for now
160 }
161 
DoPend(Context & cx)162 Poll<> PacketIO::PacketWriter::DoPend(Context& cx) {
163   // Do the work of writing any existing packets.
164   // We ignore Pending because we want to continue trying to make
165   // progress sending new packets regardless of whether the write was
166   // able to complete.
167   Poll<Status> write_status = io_.channel().PendWrite(cx);
168   if (write_status.IsReady() && !write_status->ok()) {
169     PW_LOG_ERROR("Channel::PendWrite() returned non-OK status %s",
170                  write_status->str());
171     return Ready();
172   }
173 
174   // Get the next packet to send, if any.
175   if (outbound_packet_.IsPending()) {
176     outbound_packet_ = io_.rpc_server_thread_.PendOutgoingDatagram(cx);
177   }
178 
179   if (outbound_packet_.IsPending()) {
180     return Pending();
181   }
182 
183   PACKET_IO_DEBUG_LOG("Sending %u B outbound packet",
184                       static_cast<unsigned>(outbound_packet_->size()));
185 
186   // There is a packet -- check if we can write.
187   auto writable = io_.channel().PendReadyToWrite(cx);
188   if (writable.IsPending()) {
189     return Pending();
190   }
191 
192   if (!writable->ok()) {
193     PW_LOG_ERROR("Channel::PendReadyToWrite() returned status %s",
194                  writable->str());
195     return Ready();
196   }
197 
198   // Allocate a multibuf to send the packet.
199   // TODO: b/349398108 - Instead, get a MultiBuf that refers to the queue entry.
200   auto mb = io_.channel().PendAllocateWriteBuffer(cx, outbound_packet_->size());
201   if (mb.IsPending()) {
202     return Pending();
203   }
204 
205   if (!mb->has_value()) {
206     PW_LOG_ERROR("Async MultiBuf allocation of %u B failed",
207                  static_cast<unsigned>(outbound_packet_->size()));
208     return Ready();  // Could not allocate mb
209   }
210 
211   // Copy the packet into the multibuf.
212   auto [first, second] = outbound_packet_->contiguous_data();
213   PW_CHECK_OK((*mb)->CopyFrom(first).status());
214   PW_CHECK_OK((*mb)->CopyFromAndTruncate(second, first.size()).status());
215   io_.rpc_server_thread_.PopOutboundPacket();
216 
217   PACKET_IO_DEBUG_LOG("Writing %zu B outbound packet", (**mb).size());
218   auto write_result = io_.channel().StageWrite(**std::move(mb));
219   if (!write_result.ok()) {
220     return Ready();  // Write failed, but should not have
221   }
222 
223   // Write was accepted, so set up for the next packet
224   outbound_packet_ = Pending();
225 
226   // Sent one packet, let other tasks run.
227   cx.ReEnqueue();
228   return Pending();
229 }
230 
231 }  // namespace pw::system::internal
232