xref: /aosp_15_r20/external/pigweed/pw_system/async_packet_io.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_system/internal/async_packet_io.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_system/config.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/detached_thread.h"
21*61c4878aSAndroid Build Coastguard Worker 
22*61c4878aSAndroid Build Coastguard Worker // Normal logging is not possible here. This code processes log messages, so
23*61c4878aSAndroid Build Coastguard Worker // must not produce logs for each log.
24*61c4878aSAndroid Build Coastguard Worker #define PACKET_IO_DEBUG_LOG(msg, ...)                          \
25*61c4878aSAndroid Build Coastguard Worker   if (false) /* Set to true to enable printf debug logging. */ \
26*61c4878aSAndroid Build Coastguard Worker   printf("DEBUG LOG: " msg "\n" __VA_OPT__(, ) __VA_ARGS__)
27*61c4878aSAndroid Build Coastguard Worker 
28*61c4878aSAndroid Build Coastguard Worker namespace pw::system::internal {
29*61c4878aSAndroid Build Coastguard Worker 
30*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Context;
31*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Dispatcher;
32*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Pending;
33*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Poll;
34*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Ready;
35*61c4878aSAndroid Build Coastguard Worker using ::pw::multibuf::MultiBuf;
36*61c4878aSAndroid Build Coastguard Worker 
37*61c4878aSAndroid Build Coastguard Worker // With atomic head/tail reads/writes, this type of queue interaction could be
38*61c4878aSAndroid Build Coastguard Worker // lockless in single producer, single consumer scenarios.
39*61c4878aSAndroid Build Coastguard Worker 
40*61c4878aSAndroid Build Coastguard Worker // TODO: b/349398108 - MultiBuf directly out of (and into) the ring buffer.
41*61c4878aSAndroid Build Coastguard Worker Poll<InlineVarLenEntryQueue<>::Entry>
PendOutgoingDatagram(Context & cx)42*61c4878aSAndroid Build Coastguard Worker RpcChannelOutputQueue::PendOutgoingDatagram(Context& cx) {
43*61c4878aSAndroid Build Coastguard Worker   // The head pointer will not change until Pop is called.
44*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
45*61c4878aSAndroid Build Coastguard Worker   if (queue_.empty()) {
46*61c4878aSAndroid Build Coastguard Worker     PW_ASYNC_STORE_WAKER(
47*61c4878aSAndroid Build Coastguard Worker         cx,
48*61c4878aSAndroid Build Coastguard Worker         packet_ready_,
49*61c4878aSAndroid Build Coastguard Worker         "RpcChannel is waiting for outgoing RPC datagrams to be enqueued");
50*61c4878aSAndroid Build Coastguard Worker     return Pending();
51*61c4878aSAndroid Build Coastguard Worker   }
52*61c4878aSAndroid Build Coastguard Worker   return Ready(queue_.front());
53*61c4878aSAndroid Build Coastguard Worker }
54*61c4878aSAndroid Build Coastguard Worker 
Send(ConstByteSpan datagram)55*61c4878aSAndroid Build Coastguard Worker Status RpcChannelOutputQueue::Send(ConstByteSpan datagram) {
56*61c4878aSAndroid Build Coastguard Worker   PACKET_IO_DEBUG_LOG("Pushing %zu B packet into outbound queue",
57*61c4878aSAndroid Build Coastguard Worker                       datagram.size());
58*61c4878aSAndroid Build Coastguard Worker   mutex_.lock();
59*61c4878aSAndroid Build Coastguard Worker   if (queue_.try_push(datagram)) {
60*61c4878aSAndroid Build Coastguard Worker     std::move(packet_ready_).Wake();
61*61c4878aSAndroid Build Coastguard Worker   } else {
62*61c4878aSAndroid Build Coastguard Worker     dropped_packets_ += 1;
63*61c4878aSAndroid Build Coastguard Worker   }
64*61c4878aSAndroid Build Coastguard Worker   mutex_.unlock();
65*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
66*61c4878aSAndroid Build Coastguard Worker }
67*61c4878aSAndroid Build Coastguard Worker 
RpcServerThread(Allocator & allocator,rpc::Server & server)68*61c4878aSAndroid Build Coastguard Worker RpcServerThread::RpcServerThread(Allocator& allocator, rpc::Server& server)
69*61c4878aSAndroid Build Coastguard Worker     : allocator_(allocator), rpc_server_(server) {
70*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_OK(rpc_server_.OpenChannel(1, rpc_packet_queue_));
71*61c4878aSAndroid Build Coastguard Worker }
72*61c4878aSAndroid Build Coastguard Worker 
PushPacket(MultiBuf && packet)73*61c4878aSAndroid Build Coastguard Worker void RpcServerThread::PushPacket(MultiBuf&& packet) {
74*61c4878aSAndroid Build Coastguard Worker   PACKET_IO_DEBUG_LOG("Received %zu B RPC packet", packet.size());
75*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
76*61c4878aSAndroid Build Coastguard Worker   ready_for_packet_ = false;
77*61c4878aSAndroid Build Coastguard Worker   packet_multibuf_ = std::move(packet);
78*61c4878aSAndroid Build Coastguard Worker   new_packet_available_.release();
79*61c4878aSAndroid Build Coastguard Worker }
80*61c4878aSAndroid Build Coastguard Worker 
RunOnce()81*61c4878aSAndroid Build Coastguard Worker void RpcServerThread::RunOnce() {
82*61c4878aSAndroid Build Coastguard Worker   new_packet_available_.acquire();
83*61c4878aSAndroid Build Coastguard Worker 
84*61c4878aSAndroid Build Coastguard Worker   std::optional<ConstByteSpan> span = packet_multibuf_.ContiguousSpan();
85*61c4878aSAndroid Build Coastguard Worker   if (span.has_value()) {
86*61c4878aSAndroid Build Coastguard Worker     rpc_server_.ProcessPacket(*span).IgnoreError();
87*61c4878aSAndroid Build Coastguard Worker   } else {
88*61c4878aSAndroid Build Coastguard Worker     // Copy the packet into a contiguous buffer.
89*61c4878aSAndroid Build Coastguard Worker     // TODO: b/349440355 - Consider a global buffer instead of repeated allocs.
90*61c4878aSAndroid Build Coastguard Worker     const size_t packet_size = packet_multibuf_.size();
91*61c4878aSAndroid Build Coastguard Worker     std::byte* buffer = static_cast<std::byte*>(
92*61c4878aSAndroid Build Coastguard Worker         allocator_.Allocate({packet_size, alignof(std::byte)}));
93*61c4878aSAndroid Build Coastguard Worker 
94*61c4878aSAndroid Build Coastguard Worker     auto copy_result = packet_multibuf_.CopyTo({buffer, packet_size});
95*61c4878aSAndroid Build Coastguard Worker     PW_DCHECK_OK(copy_result.status());
96*61c4878aSAndroid Build Coastguard Worker     rpc_server_.ProcessPacket({buffer, packet_size}).IgnoreError();
97*61c4878aSAndroid Build Coastguard Worker 
98*61c4878aSAndroid Build Coastguard Worker     allocator_.Deallocate(buffer);
99*61c4878aSAndroid Build Coastguard Worker   }
100*61c4878aSAndroid Build Coastguard Worker 
101*61c4878aSAndroid Build Coastguard Worker   packet_multibuf_.Release();
102*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
103*61c4878aSAndroid Build Coastguard Worker   ready_for_packet_ = true;
104*61c4878aSAndroid Build Coastguard Worker   std::move(ready_to_receive_packet_).Wake();
105*61c4878aSAndroid Build Coastguard Worker }
106*61c4878aSAndroid Build Coastguard Worker 
PacketIO(channel::ByteReaderWriter & io_channel,ByteSpan buffer,Allocator & allocator,rpc::Server & rpc_server)107*61c4878aSAndroid Build Coastguard Worker PacketIO::PacketIO(channel::ByteReaderWriter& io_channel,
108*61c4878aSAndroid Build Coastguard Worker                    ByteSpan buffer,
109*61c4878aSAndroid Build Coastguard Worker                    Allocator& allocator,
110*61c4878aSAndroid Build Coastguard Worker                    rpc::Server& rpc_server)
111*61c4878aSAndroid Build Coastguard Worker     : allocator_(allocator),
112*61c4878aSAndroid Build Coastguard Worker       mb_allocator_1_(mb_allocator_buffer_1_, allocator_),
113*61c4878aSAndroid Build Coastguard Worker       mb_allocator_2_(mb_allocator_buffer_2_, allocator_),
114*61c4878aSAndroid Build Coastguard Worker       channels_(mb_allocator_1_, mb_allocator_2_),
115*61c4878aSAndroid Build Coastguard Worker       router_(io_channel, buffer),
116*61c4878aSAndroid Build Coastguard Worker       rpc_server_thread_(allocator_, rpc_server),
117*61c4878aSAndroid Build Coastguard Worker       packet_reader_(*this),
118*61c4878aSAndroid Build Coastguard Worker       packet_writer_(*this) {
119*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_OK(router_.AddChannel(channels_.second(),
120*61c4878aSAndroid Build Coastguard Worker                                  PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS,
121*61c4878aSAndroid Build Coastguard Worker                                  PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS));
122*61c4878aSAndroid Build Coastguard Worker }
123*61c4878aSAndroid Build Coastguard Worker 
Start(Dispatcher & dispatcher,const thread::Options & thread_options)124*61c4878aSAndroid Build Coastguard Worker void PacketIO::Start(Dispatcher& dispatcher,
125*61c4878aSAndroid Build Coastguard Worker                      const thread::Options& thread_options) {
126*61c4878aSAndroid Build Coastguard Worker   dispatcher.Post(packet_reader_);
127*61c4878aSAndroid Build Coastguard Worker   dispatcher.Post(packet_writer_);
128*61c4878aSAndroid Build Coastguard Worker 
129*61c4878aSAndroid Build Coastguard Worker   thread::DetachedThread(thread_options, [this] {
130*61c4878aSAndroid Build Coastguard Worker     while (true) {
131*61c4878aSAndroid Build Coastguard Worker       rpc_server_thread_.RunOnce();
132*61c4878aSAndroid Build Coastguard Worker     }
133*61c4878aSAndroid Build Coastguard Worker   });
134*61c4878aSAndroid Build Coastguard Worker }
135*61c4878aSAndroid Build Coastguard Worker 
DoPend(Context & cx)136*61c4878aSAndroid Build Coastguard Worker Poll<> PacketIO::PacketReader::DoPend(Context& cx) {
137*61c4878aSAndroid Build Coastguard Worker   // Let the router do its work.
138*61c4878aSAndroid Build Coastguard Worker   if (io_.router_.Pend(cx).IsReady()) {
139*61c4878aSAndroid Build Coastguard Worker     return Ready();  // channel is closed, we're done here
140*61c4878aSAndroid Build Coastguard Worker   }
141*61c4878aSAndroid Build Coastguard Worker 
142*61c4878aSAndroid Build Coastguard Worker   // If the dispatcher isn't ready for another packet, wait.
143*61c4878aSAndroid Build Coastguard Worker   if (io_.rpc_server_thread_.PendReadyForPacket(cx).IsPending()) {
144*61c4878aSAndroid Build Coastguard Worker     return Pending();  // Nothing else to do for now
145*61c4878aSAndroid Build Coastguard Worker   }
146*61c4878aSAndroid Build Coastguard Worker 
147*61c4878aSAndroid Build Coastguard Worker   // Read a packet from the router and provide it to the RPC thread.
148*61c4878aSAndroid Build Coastguard Worker   auto read = io_.channel().PendRead(cx);
149*61c4878aSAndroid Build Coastguard Worker   if (read.IsPending()) {
150*61c4878aSAndroid Build Coastguard Worker     return Pending();  // Nothing else to do for now
151*61c4878aSAndroid Build Coastguard Worker   }
152*61c4878aSAndroid Build Coastguard Worker   if (!read->ok()) {
153*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Channel::PendRead() returned status %s",
154*61c4878aSAndroid Build Coastguard Worker                  read->status().str());
155*61c4878aSAndroid Build Coastguard Worker     return Ready();  // Channel is broken
156*61c4878aSAndroid Build Coastguard Worker   }
157*61c4878aSAndroid Build Coastguard Worker   // Push the packet into the RPC thread.
158*61c4878aSAndroid Build Coastguard Worker   io_.rpc_server_thread_.PushPacket(*std::move(*read));
159*61c4878aSAndroid Build Coastguard Worker   return Pending();  // Nothing else to do for now
160*61c4878aSAndroid Build Coastguard Worker }
161*61c4878aSAndroid Build Coastguard Worker 
DoPend(Context & cx)162*61c4878aSAndroid Build Coastguard Worker Poll<> PacketIO::PacketWriter::DoPend(Context& cx) {
163*61c4878aSAndroid Build Coastguard Worker   // Do the work of writing any existing packets.
164*61c4878aSAndroid Build Coastguard Worker   // We ignore Pending because we want to continue trying to make
165*61c4878aSAndroid Build Coastguard Worker   // progress sending new packets regardless of whether the write was
166*61c4878aSAndroid Build Coastguard Worker   // able to complete.
167*61c4878aSAndroid Build Coastguard Worker   Poll<Status> write_status = io_.channel().PendWrite(cx);
168*61c4878aSAndroid Build Coastguard Worker   if (write_status.IsReady() && !write_status->ok()) {
169*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Channel::PendWrite() returned non-OK status %s",
170*61c4878aSAndroid Build Coastguard Worker                  write_status->str());
171*61c4878aSAndroid Build Coastguard Worker     return Ready();
172*61c4878aSAndroid Build Coastguard Worker   }
173*61c4878aSAndroid Build Coastguard Worker 
174*61c4878aSAndroid Build Coastguard Worker   // Get the next packet to send, if any.
175*61c4878aSAndroid Build Coastguard Worker   if (outbound_packet_.IsPending()) {
176*61c4878aSAndroid Build Coastguard Worker     outbound_packet_ = io_.rpc_server_thread_.PendOutgoingDatagram(cx);
177*61c4878aSAndroid Build Coastguard Worker   }
178*61c4878aSAndroid Build Coastguard Worker 
179*61c4878aSAndroid Build Coastguard Worker   if (outbound_packet_.IsPending()) {
180*61c4878aSAndroid Build Coastguard Worker     return Pending();
181*61c4878aSAndroid Build Coastguard Worker   }
182*61c4878aSAndroid Build Coastguard Worker 
183*61c4878aSAndroid Build Coastguard Worker   PACKET_IO_DEBUG_LOG("Sending %u B outbound packet",
184*61c4878aSAndroid Build Coastguard Worker                       static_cast<unsigned>(outbound_packet_->size()));
185*61c4878aSAndroid Build Coastguard Worker 
186*61c4878aSAndroid Build Coastguard Worker   // There is a packet -- check if we can write.
187*61c4878aSAndroid Build Coastguard Worker   auto writable = io_.channel().PendReadyToWrite(cx);
188*61c4878aSAndroid Build Coastguard Worker   if (writable.IsPending()) {
189*61c4878aSAndroid Build Coastguard Worker     return Pending();
190*61c4878aSAndroid Build Coastguard Worker   }
191*61c4878aSAndroid Build Coastguard Worker 
192*61c4878aSAndroid Build Coastguard Worker   if (!writable->ok()) {
193*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Channel::PendReadyToWrite() returned status %s",
194*61c4878aSAndroid Build Coastguard Worker                  writable->str());
195*61c4878aSAndroid Build Coastguard Worker     return Ready();
196*61c4878aSAndroid Build Coastguard Worker   }
197*61c4878aSAndroid Build Coastguard Worker 
198*61c4878aSAndroid Build Coastguard Worker   // Allocate a multibuf to send the packet.
199*61c4878aSAndroid Build Coastguard Worker   // TODO: b/349398108 - Instead, get a MultiBuf that refers to the queue entry.
200*61c4878aSAndroid Build Coastguard Worker   auto mb = io_.channel().PendAllocateWriteBuffer(cx, outbound_packet_->size());
201*61c4878aSAndroid Build Coastguard Worker   if (mb.IsPending()) {
202*61c4878aSAndroid Build Coastguard Worker     return Pending();
203*61c4878aSAndroid Build Coastguard Worker   }
204*61c4878aSAndroid Build Coastguard Worker 
205*61c4878aSAndroid Build Coastguard Worker   if (!mb->has_value()) {
206*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Async MultiBuf allocation of %u B failed",
207*61c4878aSAndroid Build Coastguard Worker                  static_cast<unsigned>(outbound_packet_->size()));
208*61c4878aSAndroid Build Coastguard Worker     return Ready();  // Could not allocate mb
209*61c4878aSAndroid Build Coastguard Worker   }
210*61c4878aSAndroid Build Coastguard Worker 
211*61c4878aSAndroid Build Coastguard Worker   // Copy the packet into the multibuf.
212*61c4878aSAndroid Build Coastguard Worker   auto [first, second] = outbound_packet_->contiguous_data();
213*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_OK((*mb)->CopyFrom(first).status());
214*61c4878aSAndroid Build Coastguard Worker   PW_CHECK_OK((*mb)->CopyFromAndTruncate(second, first.size()).status());
215*61c4878aSAndroid Build Coastguard Worker   io_.rpc_server_thread_.PopOutboundPacket();
216*61c4878aSAndroid Build Coastguard Worker 
217*61c4878aSAndroid Build Coastguard Worker   PACKET_IO_DEBUG_LOG("Writing %zu B outbound packet", (**mb).size());
218*61c4878aSAndroid Build Coastguard Worker   auto write_result = io_.channel().StageWrite(**std::move(mb));
219*61c4878aSAndroid Build Coastguard Worker   if (!write_result.ok()) {
220*61c4878aSAndroid Build Coastguard Worker     return Ready();  // Write failed, but should not have
221*61c4878aSAndroid Build Coastguard Worker   }
222*61c4878aSAndroid Build Coastguard Worker 
223*61c4878aSAndroid Build Coastguard Worker   // Write was accepted, so set up for the next packet
224*61c4878aSAndroid Build Coastguard Worker   outbound_packet_ = Pending();
225*61c4878aSAndroid Build Coastguard Worker 
226*61c4878aSAndroid Build Coastguard Worker   // Sent one packet, let other tasks run.
227*61c4878aSAndroid Build Coastguard Worker   cx.ReEnqueue();
228*61c4878aSAndroid Build Coastguard Worker   return Pending();
229*61c4878aSAndroid Build Coastguard Worker }
230*61c4878aSAndroid Build Coastguard Worker 
231*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::system::internal
232