1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_system/internal/async_packet_io.h"
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_system/config.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/detached_thread.h"
21*61c4878aSAndroid Build Coastguard Worker
22*61c4878aSAndroid Build Coastguard Worker // Normal logging is not possible here. This code processes log messages, so
23*61c4878aSAndroid Build Coastguard Worker // must not produce logs for each log.
24*61c4878aSAndroid Build Coastguard Worker #define PACKET_IO_DEBUG_LOG(msg, ...) \
25*61c4878aSAndroid Build Coastguard Worker if (false) /* Set to true to enable printf debug logging. */ \
26*61c4878aSAndroid Build Coastguard Worker printf("DEBUG LOG: " msg "\n" __VA_OPT__(, ) __VA_ARGS__)
27*61c4878aSAndroid Build Coastguard Worker
28*61c4878aSAndroid Build Coastguard Worker namespace pw::system::internal {
29*61c4878aSAndroid Build Coastguard Worker
30*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Context;
31*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Dispatcher;
32*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Pending;
33*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Poll;
34*61c4878aSAndroid Build Coastguard Worker using ::pw::async2::Ready;
35*61c4878aSAndroid Build Coastguard Worker using ::pw::multibuf::MultiBuf;
36*61c4878aSAndroid Build Coastguard Worker
37*61c4878aSAndroid Build Coastguard Worker // With atomic head/tail reads/writes, this type of queue interaction could be
38*61c4878aSAndroid Build Coastguard Worker // lockless in single producer, single consumer scenarios.
39*61c4878aSAndroid Build Coastguard Worker
40*61c4878aSAndroid Build Coastguard Worker // TODO: b/349398108 - MultiBuf directly out of (and into) the ring buffer.
41*61c4878aSAndroid Build Coastguard Worker Poll<InlineVarLenEntryQueue<>::Entry>
PendOutgoingDatagram(Context & cx)42*61c4878aSAndroid Build Coastguard Worker RpcChannelOutputQueue::PendOutgoingDatagram(Context& cx) {
43*61c4878aSAndroid Build Coastguard Worker // The head pointer will not change until Pop is called.
44*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(mutex_);
45*61c4878aSAndroid Build Coastguard Worker if (queue_.empty()) {
46*61c4878aSAndroid Build Coastguard Worker PW_ASYNC_STORE_WAKER(
47*61c4878aSAndroid Build Coastguard Worker cx,
48*61c4878aSAndroid Build Coastguard Worker packet_ready_,
49*61c4878aSAndroid Build Coastguard Worker "RpcChannel is waiting for outgoing RPC datagrams to be enqueued");
50*61c4878aSAndroid Build Coastguard Worker return Pending();
51*61c4878aSAndroid Build Coastguard Worker }
52*61c4878aSAndroid Build Coastguard Worker return Ready(queue_.front());
53*61c4878aSAndroid Build Coastguard Worker }
54*61c4878aSAndroid Build Coastguard Worker
Send(ConstByteSpan datagram)55*61c4878aSAndroid Build Coastguard Worker Status RpcChannelOutputQueue::Send(ConstByteSpan datagram) {
56*61c4878aSAndroid Build Coastguard Worker PACKET_IO_DEBUG_LOG("Pushing %zu B packet into outbound queue",
57*61c4878aSAndroid Build Coastguard Worker datagram.size());
58*61c4878aSAndroid Build Coastguard Worker mutex_.lock();
59*61c4878aSAndroid Build Coastguard Worker if (queue_.try_push(datagram)) {
60*61c4878aSAndroid Build Coastguard Worker std::move(packet_ready_).Wake();
61*61c4878aSAndroid Build Coastguard Worker } else {
62*61c4878aSAndroid Build Coastguard Worker dropped_packets_ += 1;
63*61c4878aSAndroid Build Coastguard Worker }
64*61c4878aSAndroid Build Coastguard Worker mutex_.unlock();
65*61c4878aSAndroid Build Coastguard Worker return OkStatus();
66*61c4878aSAndroid Build Coastguard Worker }
67*61c4878aSAndroid Build Coastguard Worker
RpcServerThread(Allocator & allocator,rpc::Server & server)68*61c4878aSAndroid Build Coastguard Worker RpcServerThread::RpcServerThread(Allocator& allocator, rpc::Server& server)
69*61c4878aSAndroid Build Coastguard Worker : allocator_(allocator), rpc_server_(server) {
70*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK(rpc_server_.OpenChannel(1, rpc_packet_queue_));
71*61c4878aSAndroid Build Coastguard Worker }
72*61c4878aSAndroid Build Coastguard Worker
PushPacket(MultiBuf && packet)73*61c4878aSAndroid Build Coastguard Worker void RpcServerThread::PushPacket(MultiBuf&& packet) {
74*61c4878aSAndroid Build Coastguard Worker PACKET_IO_DEBUG_LOG("Received %zu B RPC packet", packet.size());
75*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(mutex_);
76*61c4878aSAndroid Build Coastguard Worker ready_for_packet_ = false;
77*61c4878aSAndroid Build Coastguard Worker packet_multibuf_ = std::move(packet);
78*61c4878aSAndroid Build Coastguard Worker new_packet_available_.release();
79*61c4878aSAndroid Build Coastguard Worker }
80*61c4878aSAndroid Build Coastguard Worker
RunOnce()81*61c4878aSAndroid Build Coastguard Worker void RpcServerThread::RunOnce() {
82*61c4878aSAndroid Build Coastguard Worker new_packet_available_.acquire();
83*61c4878aSAndroid Build Coastguard Worker
84*61c4878aSAndroid Build Coastguard Worker std::optional<ConstByteSpan> span = packet_multibuf_.ContiguousSpan();
85*61c4878aSAndroid Build Coastguard Worker if (span.has_value()) {
86*61c4878aSAndroid Build Coastguard Worker rpc_server_.ProcessPacket(*span).IgnoreError();
87*61c4878aSAndroid Build Coastguard Worker } else {
88*61c4878aSAndroid Build Coastguard Worker // Copy the packet into a contiguous buffer.
89*61c4878aSAndroid Build Coastguard Worker // TODO: b/349440355 - Consider a global buffer instead of repeated allocs.
90*61c4878aSAndroid Build Coastguard Worker const size_t packet_size = packet_multibuf_.size();
91*61c4878aSAndroid Build Coastguard Worker std::byte* buffer = static_cast<std::byte*>(
92*61c4878aSAndroid Build Coastguard Worker allocator_.Allocate({packet_size, alignof(std::byte)}));
93*61c4878aSAndroid Build Coastguard Worker
94*61c4878aSAndroid Build Coastguard Worker auto copy_result = packet_multibuf_.CopyTo({buffer, packet_size});
95*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_OK(copy_result.status());
96*61c4878aSAndroid Build Coastguard Worker rpc_server_.ProcessPacket({buffer, packet_size}).IgnoreError();
97*61c4878aSAndroid Build Coastguard Worker
98*61c4878aSAndroid Build Coastguard Worker allocator_.Deallocate(buffer);
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker
101*61c4878aSAndroid Build Coastguard Worker packet_multibuf_.Release();
102*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(mutex_);
103*61c4878aSAndroid Build Coastguard Worker ready_for_packet_ = true;
104*61c4878aSAndroid Build Coastguard Worker std::move(ready_to_receive_packet_).Wake();
105*61c4878aSAndroid Build Coastguard Worker }
106*61c4878aSAndroid Build Coastguard Worker
PacketIO(channel::ByteReaderWriter & io_channel,ByteSpan buffer,Allocator & allocator,rpc::Server & rpc_server)107*61c4878aSAndroid Build Coastguard Worker PacketIO::PacketIO(channel::ByteReaderWriter& io_channel,
108*61c4878aSAndroid Build Coastguard Worker ByteSpan buffer,
109*61c4878aSAndroid Build Coastguard Worker Allocator& allocator,
110*61c4878aSAndroid Build Coastguard Worker rpc::Server& rpc_server)
111*61c4878aSAndroid Build Coastguard Worker : allocator_(allocator),
112*61c4878aSAndroid Build Coastguard Worker mb_allocator_1_(mb_allocator_buffer_1_, allocator_),
113*61c4878aSAndroid Build Coastguard Worker mb_allocator_2_(mb_allocator_buffer_2_, allocator_),
114*61c4878aSAndroid Build Coastguard Worker channels_(mb_allocator_1_, mb_allocator_2_),
115*61c4878aSAndroid Build Coastguard Worker router_(io_channel, buffer),
116*61c4878aSAndroid Build Coastguard Worker rpc_server_thread_(allocator_, rpc_server),
117*61c4878aSAndroid Build Coastguard Worker packet_reader_(*this),
118*61c4878aSAndroid Build Coastguard Worker packet_writer_(*this) {
119*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK(router_.AddChannel(channels_.second(),
120*61c4878aSAndroid Build Coastguard Worker PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS,
121*61c4878aSAndroid Build Coastguard Worker PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS));
122*61c4878aSAndroid Build Coastguard Worker }
123*61c4878aSAndroid Build Coastguard Worker
Start(Dispatcher & dispatcher,const thread::Options & thread_options)124*61c4878aSAndroid Build Coastguard Worker void PacketIO::Start(Dispatcher& dispatcher,
125*61c4878aSAndroid Build Coastguard Worker const thread::Options& thread_options) {
126*61c4878aSAndroid Build Coastguard Worker dispatcher.Post(packet_reader_);
127*61c4878aSAndroid Build Coastguard Worker dispatcher.Post(packet_writer_);
128*61c4878aSAndroid Build Coastguard Worker
129*61c4878aSAndroid Build Coastguard Worker thread::DetachedThread(thread_options, [this] {
130*61c4878aSAndroid Build Coastguard Worker while (true) {
131*61c4878aSAndroid Build Coastguard Worker rpc_server_thread_.RunOnce();
132*61c4878aSAndroid Build Coastguard Worker }
133*61c4878aSAndroid Build Coastguard Worker });
134*61c4878aSAndroid Build Coastguard Worker }
135*61c4878aSAndroid Build Coastguard Worker
DoPend(Context & cx)136*61c4878aSAndroid Build Coastguard Worker Poll<> PacketIO::PacketReader::DoPend(Context& cx) {
137*61c4878aSAndroid Build Coastguard Worker // Let the router do its work.
138*61c4878aSAndroid Build Coastguard Worker if (io_.router_.Pend(cx).IsReady()) {
139*61c4878aSAndroid Build Coastguard Worker return Ready(); // channel is closed, we're done here
140*61c4878aSAndroid Build Coastguard Worker }
141*61c4878aSAndroid Build Coastguard Worker
142*61c4878aSAndroid Build Coastguard Worker // If the dispatcher isn't ready for another packet, wait.
143*61c4878aSAndroid Build Coastguard Worker if (io_.rpc_server_thread_.PendReadyForPacket(cx).IsPending()) {
144*61c4878aSAndroid Build Coastguard Worker return Pending(); // Nothing else to do for now
145*61c4878aSAndroid Build Coastguard Worker }
146*61c4878aSAndroid Build Coastguard Worker
147*61c4878aSAndroid Build Coastguard Worker // Read a packet from the router and provide it to the RPC thread.
148*61c4878aSAndroid Build Coastguard Worker auto read = io_.channel().PendRead(cx);
149*61c4878aSAndroid Build Coastguard Worker if (read.IsPending()) {
150*61c4878aSAndroid Build Coastguard Worker return Pending(); // Nothing else to do for now
151*61c4878aSAndroid Build Coastguard Worker }
152*61c4878aSAndroid Build Coastguard Worker if (!read->ok()) {
153*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Channel::PendRead() returned status %s",
154*61c4878aSAndroid Build Coastguard Worker read->status().str());
155*61c4878aSAndroid Build Coastguard Worker return Ready(); // Channel is broken
156*61c4878aSAndroid Build Coastguard Worker }
157*61c4878aSAndroid Build Coastguard Worker // Push the packet into the RPC thread.
158*61c4878aSAndroid Build Coastguard Worker io_.rpc_server_thread_.PushPacket(*std::move(*read));
159*61c4878aSAndroid Build Coastguard Worker return Pending(); // Nothing else to do for now
160*61c4878aSAndroid Build Coastguard Worker }
161*61c4878aSAndroid Build Coastguard Worker
DoPend(Context & cx)162*61c4878aSAndroid Build Coastguard Worker Poll<> PacketIO::PacketWriter::DoPend(Context& cx) {
163*61c4878aSAndroid Build Coastguard Worker // Do the work of writing any existing packets.
164*61c4878aSAndroid Build Coastguard Worker // We ignore Pending because we want to continue trying to make
165*61c4878aSAndroid Build Coastguard Worker // progress sending new packets regardless of whether the write was
166*61c4878aSAndroid Build Coastguard Worker // able to complete.
167*61c4878aSAndroid Build Coastguard Worker Poll<Status> write_status = io_.channel().PendWrite(cx);
168*61c4878aSAndroid Build Coastguard Worker if (write_status.IsReady() && !write_status->ok()) {
169*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Channel::PendWrite() returned non-OK status %s",
170*61c4878aSAndroid Build Coastguard Worker write_status->str());
171*61c4878aSAndroid Build Coastguard Worker return Ready();
172*61c4878aSAndroid Build Coastguard Worker }
173*61c4878aSAndroid Build Coastguard Worker
174*61c4878aSAndroid Build Coastguard Worker // Get the next packet to send, if any.
175*61c4878aSAndroid Build Coastguard Worker if (outbound_packet_.IsPending()) {
176*61c4878aSAndroid Build Coastguard Worker outbound_packet_ = io_.rpc_server_thread_.PendOutgoingDatagram(cx);
177*61c4878aSAndroid Build Coastguard Worker }
178*61c4878aSAndroid Build Coastguard Worker
179*61c4878aSAndroid Build Coastguard Worker if (outbound_packet_.IsPending()) {
180*61c4878aSAndroid Build Coastguard Worker return Pending();
181*61c4878aSAndroid Build Coastguard Worker }
182*61c4878aSAndroid Build Coastguard Worker
183*61c4878aSAndroid Build Coastguard Worker PACKET_IO_DEBUG_LOG("Sending %u B outbound packet",
184*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(outbound_packet_->size()));
185*61c4878aSAndroid Build Coastguard Worker
186*61c4878aSAndroid Build Coastguard Worker // There is a packet -- check if we can write.
187*61c4878aSAndroid Build Coastguard Worker auto writable = io_.channel().PendReadyToWrite(cx);
188*61c4878aSAndroid Build Coastguard Worker if (writable.IsPending()) {
189*61c4878aSAndroid Build Coastguard Worker return Pending();
190*61c4878aSAndroid Build Coastguard Worker }
191*61c4878aSAndroid Build Coastguard Worker
192*61c4878aSAndroid Build Coastguard Worker if (!writable->ok()) {
193*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Channel::PendReadyToWrite() returned status %s",
194*61c4878aSAndroid Build Coastguard Worker writable->str());
195*61c4878aSAndroid Build Coastguard Worker return Ready();
196*61c4878aSAndroid Build Coastguard Worker }
197*61c4878aSAndroid Build Coastguard Worker
198*61c4878aSAndroid Build Coastguard Worker // Allocate a multibuf to send the packet.
199*61c4878aSAndroid Build Coastguard Worker // TODO: b/349398108 - Instead, get a MultiBuf that refers to the queue entry.
200*61c4878aSAndroid Build Coastguard Worker auto mb = io_.channel().PendAllocateWriteBuffer(cx, outbound_packet_->size());
201*61c4878aSAndroid Build Coastguard Worker if (mb.IsPending()) {
202*61c4878aSAndroid Build Coastguard Worker return Pending();
203*61c4878aSAndroid Build Coastguard Worker }
204*61c4878aSAndroid Build Coastguard Worker
205*61c4878aSAndroid Build Coastguard Worker if (!mb->has_value()) {
206*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Async MultiBuf allocation of %u B failed",
207*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(outbound_packet_->size()));
208*61c4878aSAndroid Build Coastguard Worker return Ready(); // Could not allocate mb
209*61c4878aSAndroid Build Coastguard Worker }
210*61c4878aSAndroid Build Coastguard Worker
211*61c4878aSAndroid Build Coastguard Worker // Copy the packet into the multibuf.
212*61c4878aSAndroid Build Coastguard Worker auto [first, second] = outbound_packet_->contiguous_data();
213*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK((*mb)->CopyFrom(first).status());
214*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK((*mb)->CopyFromAndTruncate(second, first.size()).status());
215*61c4878aSAndroid Build Coastguard Worker io_.rpc_server_thread_.PopOutboundPacket();
216*61c4878aSAndroid Build Coastguard Worker
217*61c4878aSAndroid Build Coastguard Worker PACKET_IO_DEBUG_LOG("Writing %zu B outbound packet", (**mb).size());
218*61c4878aSAndroid Build Coastguard Worker auto write_result = io_.channel().StageWrite(**std::move(mb));
219*61c4878aSAndroid Build Coastguard Worker if (!write_result.ok()) {
220*61c4878aSAndroid Build Coastguard Worker return Ready(); // Write failed, but should not have
221*61c4878aSAndroid Build Coastguard Worker }
222*61c4878aSAndroid Build Coastguard Worker
223*61c4878aSAndroid Build Coastguard Worker // Write was accepted, so set up for the next packet
224*61c4878aSAndroid Build Coastguard Worker outbound_packet_ = Pending();
225*61c4878aSAndroid Build Coastguard Worker
226*61c4878aSAndroid Build Coastguard Worker // Sent one packet, let other tasks run.
227*61c4878aSAndroid Build Coastguard Worker cx.ReEnqueue();
228*61c4878aSAndroid Build Coastguard Worker return Pending();
229*61c4878aSAndroid Build Coastguard Worker }
230*61c4878aSAndroid Build Coastguard Worker
231*61c4878aSAndroid Build Coastguard Worker } // namespace pw::system::internal
232