xref: /aosp_15_r20/external/pigweed/pw_channel/stream_channel.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/stream_channel.h"
16 
17 #include "pw_async2/dispatcher_base.h"
18 #include "pw_log/log.h"
19 #include "pw_multibuf/multibuf.h"
20 #include "pw_status/status.h"
21 #include "pw_status/try.h"
22 #include "pw_thread/detached_thread.h"
23 
24 namespace pw::channel {
25 
26 using pw::OkStatus;
27 using pw::Result;
28 using pw::Status;
29 using pw::async2::Context;
30 using pw::async2::Pending;
31 using pw::async2::Poll;
32 using pw::channel::ByteReaderWriter;
33 using pw::multibuf::MultiBuf;
34 using pw::multibuf::MultiBufAllocator;
35 using pw::multibuf::OwnedChunk;
36 
37 namespace internal {
38 
HasBufferToFill()39 bool StreamChannelReadState::HasBufferToFill() {
40   std::lock_guard lock(buffer_lock_);
41   return !buffer_to_fill_.empty();
42 }
43 
ProvideBufferToFill(MultiBuf && buf)44 void StreamChannelReadState::ProvideBufferToFill(MultiBuf&& buf) {
45   {
46     std::lock_guard lock(buffer_lock_);
47     buffer_to_fill_.PushSuffix(std::move(buf));
48   }
49   buffer_to_fill_available_.release();
50 }
51 
PendFilledBuffer(Context & cx)52 Poll<Result<MultiBuf>> StreamChannelReadState::PendFilledBuffer(Context& cx) {
53   std::lock_guard lock(buffer_lock_);
54   if (!filled_buffer_.empty()) {
55     return std::move(filled_buffer_);
56   }
57   // Return an error status only after pulling all the data.
58   if (!status_.ok()) {
59     return status_;
60   }
61   PW_ASYNC_STORE_WAKER(
62       cx, on_buffer_filled_, "StreamChannel is waiting on a `Stream::Read`");
63   return Pending();
64 }
65 
ReadLoop(pw::stream::Reader & reader)66 void StreamChannelReadState::ReadLoop(pw::stream::Reader& reader) {
67   while (true) {
68     OwnedChunk buffer = WaitForBufferToFillAndTakeFrontChunk();
69     Result<pw::ByteSpan> read = reader.Read(buffer);
70     if (!read.ok()) {
71       SetReadError(read.status());
72 
73       if (!read.status().IsOutOfRange()) {
74         PW_LOG_ERROR("Failed to read from stream in StreamChannel: %s",
75                      read.status().str());
76       }
77       return;
78     }
79     buffer->Truncate(read->size());
80     ProvideFilledBuffer(MultiBuf::FromChunk(std::move(buffer)));
81   }
82 }
83 
WaitForBufferToFillAndTakeFrontChunk()84 OwnedChunk StreamChannelReadState::WaitForBufferToFillAndTakeFrontChunk() {
85   while (true) {
86     {
87       std::lock_guard lock(buffer_lock_);
88       if (!buffer_to_fill_.empty()) {
89         return buffer_to_fill_.TakeFrontChunk();
90       }
91     }
92     buffer_to_fill_available_.acquire();
93   }
94   PW_UNREACHABLE;
95 }
96 
ProvideFilledBuffer(MultiBuf && filled_buffer)97 void StreamChannelReadState::ProvideFilledBuffer(MultiBuf&& filled_buffer) {
98   std::lock_guard lock(buffer_lock_);
99   filled_buffer_.PushSuffix(std::move(filled_buffer));
100   std::move(on_buffer_filled_).Wake();
101 }
102 
SetReadError(Status status)103 void StreamChannelReadState::SetReadError(Status status) {
104   std::lock_guard lock(buffer_lock_);
105   status_ = status;
106   std::move(on_buffer_filled_).Wake();
107 }
108 
SendData(MultiBuf && buf)109 Status StreamChannelWriteState::SendData(MultiBuf&& buf) {
110   {
111     std::lock_guard lock(buffer_lock_);
112     if (!status_.ok()) {
113       return status_;
114     }
115     buffer_to_write_.PushSuffix(std::move(buf));
116   }
117   data_available_.release();
118   return OkStatus();
119 }
120 
WriteLoop(pw::stream::Writer & writer)121 void StreamChannelWriteState::WriteLoop(pw::stream::Writer& writer) {
122   while (true) {
123     data_available_.acquire();
124     MultiBuf buffer;
125     {
126       std::lock_guard lock(buffer_lock_);
127       if (buffer_to_write_.empty()) {
128         continue;
129       }
130       buffer = std::move(buffer_to_write_);
131     }
132     for (const auto& chunk : buffer.Chunks()) {
133       if (Status status = writer.Write(chunk); !status.ok()) {
134         PW_LOG_ERROR("Failed to write to stream in StreamChannel: %s",
135                      status.str());
136         std::lock_guard lock(buffer_lock_);
137         status_ = status;
138         return;
139       }
140     }
141   }
142 }
143 
144 }  // namespace internal
145 
146 static constexpr size_t kMinimumReadSize = 64;
147 static constexpr size_t kDesiredReadSize = 1024;
148 
StreamChannel(stream::Reader & reader,const thread::Options & read_thread_options,MultiBufAllocator & read_allocator,stream::Writer & writer,const thread::Options & write_thread_options,MultiBufAllocator & write_allocator)149 StreamChannel::StreamChannel(stream::Reader& reader,
150                              const thread::Options& read_thread_options,
151                              MultiBufAllocator& read_allocator,
152                              stream::Writer& writer,
153                              const thread::Options& write_thread_options,
154                              MultiBufAllocator& write_allocator)
155     : reader_(reader),
156       writer_(writer),
157       read_state_(),
158       write_state_(),
159       read_allocation_future_(read_allocator),
160       write_allocation_future_(write_allocator) {
161   pw::thread::DetachedThread(read_thread_options,
162                              [this]() { read_state_.ReadLoop(reader_); });
163   pw::thread::DetachedThread(write_thread_options,
164                              [this]() { write_state_.WriteLoop(writer_); });
165 }
166 
ProvideBufferIfAvailable(Context & cx)167 Status StreamChannel::ProvideBufferIfAvailable(Context& cx) {
168   if (read_state_.HasBufferToFill()) {
169     return OkStatus();
170   }
171 
172   read_allocation_future_.SetDesiredSizes(
173       kMinimumReadSize, kDesiredReadSize, pw::multibuf::kNeedsContiguous);
174   Poll<std::optional<MultiBuf>> maybe_multibuf =
175       read_allocation_future_.Pend(cx);
176 
177   // If this is pending, we'll be awoken and this function will be re-run
178   // when a buffer becomes available, allowing us to provide a buffer.
179   if (maybe_multibuf.IsPending()) {
180     return OkStatus();
181   }
182 
183   if (!maybe_multibuf->has_value()) {
184     PW_LOG_ERROR("Failed to allocate multibuf for reading");
185     return Status::ResourceExhausted();
186   }
187 
188   read_state_.ProvideBufferToFill(std::move(**maybe_multibuf));
189   return OkStatus();
190 }
191 
DoPendRead(Context & cx)192 Poll<Result<MultiBuf>> StreamChannel::DoPendRead(Context& cx) {
193   PW_TRY(ProvideBufferIfAvailable(cx));
194   return read_state_.PendFilledBuffer(cx);
195 }
196 
DoPendReadyToWrite(Context &)197 Poll<Status> StreamChannel::DoPendReadyToWrite(Context&) { return OkStatus(); }
198 
DoStageWrite(pw::multibuf::MultiBuf && data)199 pw::Status StreamChannel::DoStageWrite(pw::multibuf::MultiBuf&& data) {
200   PW_TRY(write_state_.SendData(std::move(data)));
201   return OkStatus();
202 }
203 
204 }  // namespace pw::channel
205