xref: /aosp_15_r20/external/pigweed/pw_channel/stream_channel.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_channel/stream_channel.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include "pw_async2/dispatcher_base.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_multibuf/multibuf.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_status/try.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/detached_thread.h"
23*61c4878aSAndroid Build Coastguard Worker 
24*61c4878aSAndroid Build Coastguard Worker namespace pw::channel {
25*61c4878aSAndroid Build Coastguard Worker 
26*61c4878aSAndroid Build Coastguard Worker using pw::OkStatus;
27*61c4878aSAndroid Build Coastguard Worker using pw::Result;
28*61c4878aSAndroid Build Coastguard Worker using pw::Status;
29*61c4878aSAndroid Build Coastguard Worker using pw::async2::Context;
30*61c4878aSAndroid Build Coastguard Worker using pw::async2::Pending;
31*61c4878aSAndroid Build Coastguard Worker using pw::async2::Poll;
32*61c4878aSAndroid Build Coastguard Worker using pw::channel::ByteReaderWriter;
33*61c4878aSAndroid Build Coastguard Worker using pw::multibuf::MultiBuf;
34*61c4878aSAndroid Build Coastguard Worker using pw::multibuf::MultiBufAllocator;
35*61c4878aSAndroid Build Coastguard Worker using pw::multibuf::OwnedChunk;
36*61c4878aSAndroid Build Coastguard Worker 
37*61c4878aSAndroid Build Coastguard Worker namespace internal {
38*61c4878aSAndroid Build Coastguard Worker 
HasBufferToFill()39*61c4878aSAndroid Build Coastguard Worker bool StreamChannelReadState::HasBufferToFill() {
40*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(buffer_lock_);
41*61c4878aSAndroid Build Coastguard Worker   return !buffer_to_fill_.empty();
42*61c4878aSAndroid Build Coastguard Worker }
43*61c4878aSAndroid Build Coastguard Worker 
ProvideBufferToFill(MultiBuf && buf)44*61c4878aSAndroid Build Coastguard Worker void StreamChannelReadState::ProvideBufferToFill(MultiBuf&& buf) {
45*61c4878aSAndroid Build Coastguard Worker   {
46*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(buffer_lock_);
47*61c4878aSAndroid Build Coastguard Worker     buffer_to_fill_.PushSuffix(std::move(buf));
48*61c4878aSAndroid Build Coastguard Worker   }
49*61c4878aSAndroid Build Coastguard Worker   buffer_to_fill_available_.release();
50*61c4878aSAndroid Build Coastguard Worker }
51*61c4878aSAndroid Build Coastguard Worker 
PendFilledBuffer(Context & cx)52*61c4878aSAndroid Build Coastguard Worker Poll<Result<MultiBuf>> StreamChannelReadState::PendFilledBuffer(Context& cx) {
53*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(buffer_lock_);
54*61c4878aSAndroid Build Coastguard Worker   if (!filled_buffer_.empty()) {
55*61c4878aSAndroid Build Coastguard Worker     return std::move(filled_buffer_);
56*61c4878aSAndroid Build Coastguard Worker   }
57*61c4878aSAndroid Build Coastguard Worker   // Return an error status only after pulling all the data.
58*61c4878aSAndroid Build Coastguard Worker   if (!status_.ok()) {
59*61c4878aSAndroid Build Coastguard Worker     return status_;
60*61c4878aSAndroid Build Coastguard Worker   }
61*61c4878aSAndroid Build Coastguard Worker   PW_ASYNC_STORE_WAKER(
62*61c4878aSAndroid Build Coastguard Worker       cx, on_buffer_filled_, "StreamChannel is waiting on a `Stream::Read`");
63*61c4878aSAndroid Build Coastguard Worker   return Pending();
64*61c4878aSAndroid Build Coastguard Worker }
65*61c4878aSAndroid Build Coastguard Worker 
ReadLoop(pw::stream::Reader & reader)66*61c4878aSAndroid Build Coastguard Worker void StreamChannelReadState::ReadLoop(pw::stream::Reader& reader) {
67*61c4878aSAndroid Build Coastguard Worker   while (true) {
68*61c4878aSAndroid Build Coastguard Worker     OwnedChunk buffer = WaitForBufferToFillAndTakeFrontChunk();
69*61c4878aSAndroid Build Coastguard Worker     Result<pw::ByteSpan> read = reader.Read(buffer);
70*61c4878aSAndroid Build Coastguard Worker     if (!read.ok()) {
71*61c4878aSAndroid Build Coastguard Worker       SetReadError(read.status());
72*61c4878aSAndroid Build Coastguard Worker 
73*61c4878aSAndroid Build Coastguard Worker       if (!read.status().IsOutOfRange()) {
74*61c4878aSAndroid Build Coastguard Worker         PW_LOG_ERROR("Failed to read from stream in StreamChannel: %s",
75*61c4878aSAndroid Build Coastguard Worker                      read.status().str());
76*61c4878aSAndroid Build Coastguard Worker       }
77*61c4878aSAndroid Build Coastguard Worker       return;
78*61c4878aSAndroid Build Coastguard Worker     }
79*61c4878aSAndroid Build Coastguard Worker     buffer->Truncate(read->size());
80*61c4878aSAndroid Build Coastguard Worker     ProvideFilledBuffer(MultiBuf::FromChunk(std::move(buffer)));
81*61c4878aSAndroid Build Coastguard Worker   }
82*61c4878aSAndroid Build Coastguard Worker }
83*61c4878aSAndroid Build Coastguard Worker 
WaitForBufferToFillAndTakeFrontChunk()84*61c4878aSAndroid Build Coastguard Worker OwnedChunk StreamChannelReadState::WaitForBufferToFillAndTakeFrontChunk() {
85*61c4878aSAndroid Build Coastguard Worker   while (true) {
86*61c4878aSAndroid Build Coastguard Worker     {
87*61c4878aSAndroid Build Coastguard Worker       std::lock_guard lock(buffer_lock_);
88*61c4878aSAndroid Build Coastguard Worker       if (!buffer_to_fill_.empty()) {
89*61c4878aSAndroid Build Coastguard Worker         return buffer_to_fill_.TakeFrontChunk();
90*61c4878aSAndroid Build Coastguard Worker       }
91*61c4878aSAndroid Build Coastguard Worker     }
92*61c4878aSAndroid Build Coastguard Worker     buffer_to_fill_available_.acquire();
93*61c4878aSAndroid Build Coastguard Worker   }
94*61c4878aSAndroid Build Coastguard Worker   PW_UNREACHABLE;
95*61c4878aSAndroid Build Coastguard Worker }
96*61c4878aSAndroid Build Coastguard Worker 
ProvideFilledBuffer(MultiBuf && filled_buffer)97*61c4878aSAndroid Build Coastguard Worker void StreamChannelReadState::ProvideFilledBuffer(MultiBuf&& filled_buffer) {
98*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(buffer_lock_);
99*61c4878aSAndroid Build Coastguard Worker   filled_buffer_.PushSuffix(std::move(filled_buffer));
100*61c4878aSAndroid Build Coastguard Worker   std::move(on_buffer_filled_).Wake();
101*61c4878aSAndroid Build Coastguard Worker }
102*61c4878aSAndroid Build Coastguard Worker 
SetReadError(Status status)103*61c4878aSAndroid Build Coastguard Worker void StreamChannelReadState::SetReadError(Status status) {
104*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(buffer_lock_);
105*61c4878aSAndroid Build Coastguard Worker   status_ = status;
106*61c4878aSAndroid Build Coastguard Worker   std::move(on_buffer_filled_).Wake();
107*61c4878aSAndroid Build Coastguard Worker }
108*61c4878aSAndroid Build Coastguard Worker 
SendData(MultiBuf && buf)109*61c4878aSAndroid Build Coastguard Worker Status StreamChannelWriteState::SendData(MultiBuf&& buf) {
110*61c4878aSAndroid Build Coastguard Worker   {
111*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(buffer_lock_);
112*61c4878aSAndroid Build Coastguard Worker     if (!status_.ok()) {
113*61c4878aSAndroid Build Coastguard Worker       return status_;
114*61c4878aSAndroid Build Coastguard Worker     }
115*61c4878aSAndroid Build Coastguard Worker     buffer_to_write_.PushSuffix(std::move(buf));
116*61c4878aSAndroid Build Coastguard Worker   }
117*61c4878aSAndroid Build Coastguard Worker   data_available_.release();
118*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
119*61c4878aSAndroid Build Coastguard Worker }
120*61c4878aSAndroid Build Coastguard Worker 
WriteLoop(pw::stream::Writer & writer)121*61c4878aSAndroid Build Coastguard Worker void StreamChannelWriteState::WriteLoop(pw::stream::Writer& writer) {
122*61c4878aSAndroid Build Coastguard Worker   while (true) {
123*61c4878aSAndroid Build Coastguard Worker     data_available_.acquire();
124*61c4878aSAndroid Build Coastguard Worker     MultiBuf buffer;
125*61c4878aSAndroid Build Coastguard Worker     {
126*61c4878aSAndroid Build Coastguard Worker       std::lock_guard lock(buffer_lock_);
127*61c4878aSAndroid Build Coastguard Worker       if (buffer_to_write_.empty()) {
128*61c4878aSAndroid Build Coastguard Worker         continue;
129*61c4878aSAndroid Build Coastguard Worker       }
130*61c4878aSAndroid Build Coastguard Worker       buffer = std::move(buffer_to_write_);
131*61c4878aSAndroid Build Coastguard Worker     }
132*61c4878aSAndroid Build Coastguard Worker     for (const auto& chunk : buffer.Chunks()) {
133*61c4878aSAndroid Build Coastguard Worker       if (Status status = writer.Write(chunk); !status.ok()) {
134*61c4878aSAndroid Build Coastguard Worker         PW_LOG_ERROR("Failed to write to stream in StreamChannel: %s",
135*61c4878aSAndroid Build Coastguard Worker                      status.str());
136*61c4878aSAndroid Build Coastguard Worker         std::lock_guard lock(buffer_lock_);
137*61c4878aSAndroid Build Coastguard Worker         status_ = status;
138*61c4878aSAndroid Build Coastguard Worker         return;
139*61c4878aSAndroid Build Coastguard Worker       }
140*61c4878aSAndroid Build Coastguard Worker     }
141*61c4878aSAndroid Build Coastguard Worker   }
142*61c4878aSAndroid Build Coastguard Worker }
143*61c4878aSAndroid Build Coastguard Worker 
144*61c4878aSAndroid Build Coastguard Worker }  // namespace internal
145*61c4878aSAndroid Build Coastguard Worker 
146*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kMinimumReadSize = 64;
147*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kDesiredReadSize = 1024;
148*61c4878aSAndroid Build Coastguard Worker 
StreamChannel(stream::Reader & reader,const thread::Options & read_thread_options,MultiBufAllocator & read_allocator,stream::Writer & writer,const thread::Options & write_thread_options,MultiBufAllocator & write_allocator)149*61c4878aSAndroid Build Coastguard Worker StreamChannel::StreamChannel(stream::Reader& reader,
150*61c4878aSAndroid Build Coastguard Worker                              const thread::Options& read_thread_options,
151*61c4878aSAndroid Build Coastguard Worker                              MultiBufAllocator& read_allocator,
152*61c4878aSAndroid Build Coastguard Worker                              stream::Writer& writer,
153*61c4878aSAndroid Build Coastguard Worker                              const thread::Options& write_thread_options,
154*61c4878aSAndroid Build Coastguard Worker                              MultiBufAllocator& write_allocator)
155*61c4878aSAndroid Build Coastguard Worker     : reader_(reader),
156*61c4878aSAndroid Build Coastguard Worker       writer_(writer),
157*61c4878aSAndroid Build Coastguard Worker       read_state_(),
158*61c4878aSAndroid Build Coastguard Worker       write_state_(),
159*61c4878aSAndroid Build Coastguard Worker       read_allocation_future_(read_allocator),
160*61c4878aSAndroid Build Coastguard Worker       write_allocation_future_(write_allocator) {
161*61c4878aSAndroid Build Coastguard Worker   pw::thread::DetachedThread(read_thread_options,
162*61c4878aSAndroid Build Coastguard Worker                              [this]() { read_state_.ReadLoop(reader_); });
163*61c4878aSAndroid Build Coastguard Worker   pw::thread::DetachedThread(write_thread_options,
164*61c4878aSAndroid Build Coastguard Worker                              [this]() { write_state_.WriteLoop(writer_); });
165*61c4878aSAndroid Build Coastguard Worker }
166*61c4878aSAndroid Build Coastguard Worker 
ProvideBufferIfAvailable(Context & cx)167*61c4878aSAndroid Build Coastguard Worker Status StreamChannel::ProvideBufferIfAvailable(Context& cx) {
168*61c4878aSAndroid Build Coastguard Worker   if (read_state_.HasBufferToFill()) {
169*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
170*61c4878aSAndroid Build Coastguard Worker   }
171*61c4878aSAndroid Build Coastguard Worker 
172*61c4878aSAndroid Build Coastguard Worker   read_allocation_future_.SetDesiredSizes(
173*61c4878aSAndroid Build Coastguard Worker       kMinimumReadSize, kDesiredReadSize, pw::multibuf::kNeedsContiguous);
174*61c4878aSAndroid Build Coastguard Worker   Poll<std::optional<MultiBuf>> maybe_multibuf =
175*61c4878aSAndroid Build Coastguard Worker       read_allocation_future_.Pend(cx);
176*61c4878aSAndroid Build Coastguard Worker 
177*61c4878aSAndroid Build Coastguard Worker   // If this is pending, we'll be awoken and this function will be re-run
178*61c4878aSAndroid Build Coastguard Worker   // when a buffer becomes available, allowing us to provide a buffer.
179*61c4878aSAndroid Build Coastguard Worker   if (maybe_multibuf.IsPending()) {
180*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
181*61c4878aSAndroid Build Coastguard Worker   }
182*61c4878aSAndroid Build Coastguard Worker 
183*61c4878aSAndroid Build Coastguard Worker   if (!maybe_multibuf->has_value()) {
184*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to allocate multibuf for reading");
185*61c4878aSAndroid Build Coastguard Worker     return Status::ResourceExhausted();
186*61c4878aSAndroid Build Coastguard Worker   }
187*61c4878aSAndroid Build Coastguard Worker 
188*61c4878aSAndroid Build Coastguard Worker   read_state_.ProvideBufferToFill(std::move(**maybe_multibuf));
189*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
190*61c4878aSAndroid Build Coastguard Worker }
191*61c4878aSAndroid Build Coastguard Worker 
DoPendRead(Context & cx)192*61c4878aSAndroid Build Coastguard Worker Poll<Result<MultiBuf>> StreamChannel::DoPendRead(Context& cx) {
193*61c4878aSAndroid Build Coastguard Worker   PW_TRY(ProvideBufferIfAvailable(cx));
194*61c4878aSAndroid Build Coastguard Worker   return read_state_.PendFilledBuffer(cx);
195*61c4878aSAndroid Build Coastguard Worker }
196*61c4878aSAndroid Build Coastguard Worker 
DoPendReadyToWrite(Context &)197*61c4878aSAndroid Build Coastguard Worker Poll<Status> StreamChannel::DoPendReadyToWrite(Context&) { return OkStatus(); }
198*61c4878aSAndroid Build Coastguard Worker 
DoStageWrite(pw::multibuf::MultiBuf && data)199*61c4878aSAndroid Build Coastguard Worker pw::Status StreamChannel::DoStageWrite(pw::multibuf::MultiBuf&& data) {
200*61c4878aSAndroid Build Coastguard Worker   PW_TRY(write_state_.SendData(std::move(data)));
201*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
202*61c4878aSAndroid Build Coastguard Worker }
203*61c4878aSAndroid Build Coastguard Worker 
204*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::channel
205