1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_system/internal/async_packet_io.h"
16
17 #include "pw_assert/check.h"
18 #include "pw_log/log.h"
19 #include "pw_system/config.h"
20 #include "pw_thread/detached_thread.h"
21
22 // Normal logging is not possible here. This code processes log messages, so
23 // must not produce logs for each log.
24 #define PACKET_IO_DEBUG_LOG(msg, ...) \
25 if (false) /* Set to true to enable printf debug logging. */ \
26 printf("DEBUG LOG: " msg "\n" __VA_OPT__(, ) __VA_ARGS__)
27
28 namespace pw::system::internal {
29
30 using ::pw::async2::Context;
31 using ::pw::async2::Dispatcher;
32 using ::pw::async2::Pending;
33 using ::pw::async2::Poll;
34 using ::pw::async2::Ready;
35 using ::pw::multibuf::MultiBuf;
36
37 // With atomic head/tail reads/writes, this type of queue interaction could be
38 // lockless in single producer, single consumer scenarios.
39
40 // TODO: b/349398108 - MultiBuf directly out of (and into) the ring buffer.
41 Poll<InlineVarLenEntryQueue<>::Entry>
PendOutgoingDatagram(Context & cx)42 RpcChannelOutputQueue::PendOutgoingDatagram(Context& cx) {
43 // The head pointer will not change until Pop is called.
44 std::lock_guard lock(mutex_);
45 if (queue_.empty()) {
46 PW_ASYNC_STORE_WAKER(
47 cx,
48 packet_ready_,
49 "RpcChannel is waiting for outgoing RPC datagrams to be enqueued");
50 return Pending();
51 }
52 return Ready(queue_.front());
53 }
54
Send(ConstByteSpan datagram)55 Status RpcChannelOutputQueue::Send(ConstByteSpan datagram) {
56 PACKET_IO_DEBUG_LOG("Pushing %zu B packet into outbound queue",
57 datagram.size());
58 mutex_.lock();
59 if (queue_.try_push(datagram)) {
60 std::move(packet_ready_).Wake();
61 } else {
62 dropped_packets_ += 1;
63 }
64 mutex_.unlock();
65 return OkStatus();
66 }
67
RpcServerThread(Allocator & allocator,rpc::Server & server)68 RpcServerThread::RpcServerThread(Allocator& allocator, rpc::Server& server)
69 : allocator_(allocator), rpc_server_(server) {
70 PW_CHECK_OK(rpc_server_.OpenChannel(1, rpc_packet_queue_));
71 }
72
PushPacket(MultiBuf && packet)73 void RpcServerThread::PushPacket(MultiBuf&& packet) {
74 PACKET_IO_DEBUG_LOG("Received %zu B RPC packet", packet.size());
75 std::lock_guard lock(mutex_);
76 ready_for_packet_ = false;
77 packet_multibuf_ = std::move(packet);
78 new_packet_available_.release();
79 }
80
RunOnce()81 void RpcServerThread::RunOnce() {
82 new_packet_available_.acquire();
83
84 std::optional<ConstByteSpan> span = packet_multibuf_.ContiguousSpan();
85 if (span.has_value()) {
86 rpc_server_.ProcessPacket(*span).IgnoreError();
87 } else {
88 // Copy the packet into a contiguous buffer.
89 // TODO: b/349440355 - Consider a global buffer instead of repeated allocs.
90 const size_t packet_size = packet_multibuf_.size();
91 std::byte* buffer = static_cast<std::byte*>(
92 allocator_.Allocate({packet_size, alignof(std::byte)}));
93
94 auto copy_result = packet_multibuf_.CopyTo({buffer, packet_size});
95 PW_DCHECK_OK(copy_result.status());
96 rpc_server_.ProcessPacket({buffer, packet_size}).IgnoreError();
97
98 allocator_.Deallocate(buffer);
99 }
100
101 packet_multibuf_.Release();
102 std::lock_guard lock(mutex_);
103 ready_for_packet_ = true;
104 std::move(ready_to_receive_packet_).Wake();
105 }
106
PacketIO(channel::ByteReaderWriter & io_channel,ByteSpan buffer,Allocator & allocator,rpc::Server & rpc_server)107 PacketIO::PacketIO(channel::ByteReaderWriter& io_channel,
108 ByteSpan buffer,
109 Allocator& allocator,
110 rpc::Server& rpc_server)
111 : allocator_(allocator),
112 mb_allocator_1_(mb_allocator_buffer_1_, allocator_),
113 mb_allocator_2_(mb_allocator_buffer_2_, allocator_),
114 channels_(mb_allocator_1_, mb_allocator_2_),
115 router_(io_channel, buffer),
116 rpc_server_thread_(allocator_, rpc_server),
117 packet_reader_(*this),
118 packet_writer_(*this) {
119 PW_CHECK_OK(router_.AddChannel(channels_.second(),
120 PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS,
121 PW_SYSTEM_DEFAULT_RPC_HDLC_ADDRESS));
122 }
123
Start(Dispatcher & dispatcher,const thread::Options & thread_options)124 void PacketIO::Start(Dispatcher& dispatcher,
125 const thread::Options& thread_options) {
126 dispatcher.Post(packet_reader_);
127 dispatcher.Post(packet_writer_);
128
129 thread::DetachedThread(thread_options, [this] {
130 while (true) {
131 rpc_server_thread_.RunOnce();
132 }
133 });
134 }
135
DoPend(Context & cx)136 Poll<> PacketIO::PacketReader::DoPend(Context& cx) {
137 // Let the router do its work.
138 if (io_.router_.Pend(cx).IsReady()) {
139 return Ready(); // channel is closed, we're done here
140 }
141
142 // If the dispatcher isn't ready for another packet, wait.
143 if (io_.rpc_server_thread_.PendReadyForPacket(cx).IsPending()) {
144 return Pending(); // Nothing else to do for now
145 }
146
147 // Read a packet from the router and provide it to the RPC thread.
148 auto read = io_.channel().PendRead(cx);
149 if (read.IsPending()) {
150 return Pending(); // Nothing else to do for now
151 }
152 if (!read->ok()) {
153 PW_LOG_ERROR("Channel::PendRead() returned status %s",
154 read->status().str());
155 return Ready(); // Channel is broken
156 }
157 // Push the packet into the RPC thread.
158 io_.rpc_server_thread_.PushPacket(*std::move(*read));
159 return Pending(); // Nothing else to do for now
160 }
161
DoPend(Context & cx)162 Poll<> PacketIO::PacketWriter::DoPend(Context& cx) {
163 // Do the work of writing any existing packets.
164 // We ignore Pending because we want to continue trying to make
165 // progress sending new packets regardless of whether the write was
166 // able to complete.
167 Poll<Status> write_status = io_.channel().PendWrite(cx);
168 if (write_status.IsReady() && !write_status->ok()) {
169 PW_LOG_ERROR("Channel::PendWrite() returned non-OK status %s",
170 write_status->str());
171 return Ready();
172 }
173
174 // Get the next packet to send, if any.
175 if (outbound_packet_.IsPending()) {
176 outbound_packet_ = io_.rpc_server_thread_.PendOutgoingDatagram(cx);
177 }
178
179 if (outbound_packet_.IsPending()) {
180 return Pending();
181 }
182
183 PACKET_IO_DEBUG_LOG("Sending %u B outbound packet",
184 static_cast<unsigned>(outbound_packet_->size()));
185
186 // There is a packet -- check if we can write.
187 auto writable = io_.channel().PendReadyToWrite(cx);
188 if (writable.IsPending()) {
189 return Pending();
190 }
191
192 if (!writable->ok()) {
193 PW_LOG_ERROR("Channel::PendReadyToWrite() returned status %s",
194 writable->str());
195 return Ready();
196 }
197
198 // Allocate a multibuf to send the packet.
199 // TODO: b/349398108 - Instead, get a MultiBuf that refers to the queue entry.
200 auto mb = io_.channel().PendAllocateWriteBuffer(cx, outbound_packet_->size());
201 if (mb.IsPending()) {
202 return Pending();
203 }
204
205 if (!mb->has_value()) {
206 PW_LOG_ERROR("Async MultiBuf allocation of %u B failed",
207 static_cast<unsigned>(outbound_packet_->size()));
208 return Ready(); // Could not allocate mb
209 }
210
211 // Copy the packet into the multibuf.
212 auto [first, second] = outbound_packet_->contiguous_data();
213 PW_CHECK_OK((*mb)->CopyFrom(first).status());
214 PW_CHECK_OK((*mb)->CopyFromAndTruncate(second, first.size()).status());
215 io_.rpc_server_thread_.PopOutboundPacket();
216
217 PACKET_IO_DEBUG_LOG("Writing %zu B outbound packet", (**mb).size());
218 auto write_result = io_.channel().StageWrite(**std::move(mb));
219 if (!write_result.ok()) {
220 return Ready(); // Write failed, but should not have
221 }
222
223 // Write was accepted, so set up for the next packet
224 outbound_packet_ = Pending();
225
226 // Sent one packet, let other tasks run.
227 cx.ReEnqueue();
228 return Pending();
229 }
230
231 } // namespace pw::system::internal
232