1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/stream_channel.h"
16
17 #include "pw_async2/dispatcher_base.h"
18 #include "pw_log/log.h"
19 #include "pw_multibuf/multibuf.h"
20 #include "pw_status/status.h"
21 #include "pw_status/try.h"
22 #include "pw_thread/detached_thread.h"
23
24 namespace pw::channel {
25
26 using pw::OkStatus;
27 using pw::Result;
28 using pw::Status;
29 using pw::async2::Context;
30 using pw::async2::Pending;
31 using pw::async2::Poll;
32 using pw::channel::ByteReaderWriter;
33 using pw::multibuf::MultiBuf;
34 using pw::multibuf::MultiBufAllocator;
35 using pw::multibuf::OwnedChunk;
36
37 namespace internal {
38
HasBufferToFill()39 bool StreamChannelReadState::HasBufferToFill() {
40 std::lock_guard lock(buffer_lock_);
41 return !buffer_to_fill_.empty();
42 }
43
ProvideBufferToFill(MultiBuf && buf)44 void StreamChannelReadState::ProvideBufferToFill(MultiBuf&& buf) {
45 {
46 std::lock_guard lock(buffer_lock_);
47 buffer_to_fill_.PushSuffix(std::move(buf));
48 }
49 buffer_to_fill_available_.release();
50 }
51
PendFilledBuffer(Context & cx)52 Poll<Result<MultiBuf>> StreamChannelReadState::PendFilledBuffer(Context& cx) {
53 std::lock_guard lock(buffer_lock_);
54 if (!filled_buffer_.empty()) {
55 return std::move(filled_buffer_);
56 }
57 // Return an error status only after pulling all the data.
58 if (!status_.ok()) {
59 return status_;
60 }
61 PW_ASYNC_STORE_WAKER(
62 cx, on_buffer_filled_, "StreamChannel is waiting on a `Stream::Read`");
63 return Pending();
64 }
65
ReadLoop(pw::stream::Reader & reader)66 void StreamChannelReadState::ReadLoop(pw::stream::Reader& reader) {
67 while (true) {
68 OwnedChunk buffer = WaitForBufferToFillAndTakeFrontChunk();
69 Result<pw::ByteSpan> read = reader.Read(buffer);
70 if (!read.ok()) {
71 SetReadError(read.status());
72
73 if (!read.status().IsOutOfRange()) {
74 PW_LOG_ERROR("Failed to read from stream in StreamChannel: %s",
75 read.status().str());
76 }
77 return;
78 }
79 buffer->Truncate(read->size());
80 ProvideFilledBuffer(MultiBuf::FromChunk(std::move(buffer)));
81 }
82 }
83
WaitForBufferToFillAndTakeFrontChunk()84 OwnedChunk StreamChannelReadState::WaitForBufferToFillAndTakeFrontChunk() {
85 while (true) {
86 {
87 std::lock_guard lock(buffer_lock_);
88 if (!buffer_to_fill_.empty()) {
89 return buffer_to_fill_.TakeFrontChunk();
90 }
91 }
92 buffer_to_fill_available_.acquire();
93 }
94 PW_UNREACHABLE;
95 }
96
ProvideFilledBuffer(MultiBuf && filled_buffer)97 void StreamChannelReadState::ProvideFilledBuffer(MultiBuf&& filled_buffer) {
98 std::lock_guard lock(buffer_lock_);
99 filled_buffer_.PushSuffix(std::move(filled_buffer));
100 std::move(on_buffer_filled_).Wake();
101 }
102
SetReadError(Status status)103 void StreamChannelReadState::SetReadError(Status status) {
104 std::lock_guard lock(buffer_lock_);
105 status_ = status;
106 std::move(on_buffer_filled_).Wake();
107 }
108
SendData(MultiBuf && buf)109 Status StreamChannelWriteState::SendData(MultiBuf&& buf) {
110 {
111 std::lock_guard lock(buffer_lock_);
112 if (!status_.ok()) {
113 return status_;
114 }
115 buffer_to_write_.PushSuffix(std::move(buf));
116 }
117 data_available_.release();
118 return OkStatus();
119 }
120
WriteLoop(pw::stream::Writer & writer)121 void StreamChannelWriteState::WriteLoop(pw::stream::Writer& writer) {
122 while (true) {
123 data_available_.acquire();
124 MultiBuf buffer;
125 {
126 std::lock_guard lock(buffer_lock_);
127 if (buffer_to_write_.empty()) {
128 continue;
129 }
130 buffer = std::move(buffer_to_write_);
131 }
132 for (const auto& chunk : buffer.Chunks()) {
133 if (Status status = writer.Write(chunk); !status.ok()) {
134 PW_LOG_ERROR("Failed to write to stream in StreamChannel: %s",
135 status.str());
136 std::lock_guard lock(buffer_lock_);
137 status_ = status;
138 return;
139 }
140 }
141 }
142 }
143
144 } // namespace internal
145
146 static constexpr size_t kMinimumReadSize = 64;
147 static constexpr size_t kDesiredReadSize = 1024;
148
StreamChannel(stream::Reader & reader,const thread::Options & read_thread_options,MultiBufAllocator & read_allocator,stream::Writer & writer,const thread::Options & write_thread_options,MultiBufAllocator & write_allocator)149 StreamChannel::StreamChannel(stream::Reader& reader,
150 const thread::Options& read_thread_options,
151 MultiBufAllocator& read_allocator,
152 stream::Writer& writer,
153 const thread::Options& write_thread_options,
154 MultiBufAllocator& write_allocator)
155 : reader_(reader),
156 writer_(writer),
157 read_state_(),
158 write_state_(),
159 read_allocation_future_(read_allocator),
160 write_allocation_future_(write_allocator) {
161 pw::thread::DetachedThread(read_thread_options,
162 [this]() { read_state_.ReadLoop(reader_); });
163 pw::thread::DetachedThread(write_thread_options,
164 [this]() { write_state_.WriteLoop(writer_); });
165 }
166
ProvideBufferIfAvailable(Context & cx)167 Status StreamChannel::ProvideBufferIfAvailable(Context& cx) {
168 if (read_state_.HasBufferToFill()) {
169 return OkStatus();
170 }
171
172 read_allocation_future_.SetDesiredSizes(
173 kMinimumReadSize, kDesiredReadSize, pw::multibuf::kNeedsContiguous);
174 Poll<std::optional<MultiBuf>> maybe_multibuf =
175 read_allocation_future_.Pend(cx);
176
177 // If this is pending, we'll be awoken and this function will be re-run
178 // when a buffer becomes available, allowing us to provide a buffer.
179 if (maybe_multibuf.IsPending()) {
180 return OkStatus();
181 }
182
183 if (!maybe_multibuf->has_value()) {
184 PW_LOG_ERROR("Failed to allocate multibuf for reading");
185 return Status::ResourceExhausted();
186 }
187
188 read_state_.ProvideBufferToFill(std::move(**maybe_multibuf));
189 return OkStatus();
190 }
191
DoPendRead(Context & cx)192 Poll<Result<MultiBuf>> StreamChannel::DoPendRead(Context& cx) {
193 PW_TRY(ProvideBufferIfAvailable(cx));
194 return read_state_.PendFilledBuffer(cx);
195 }
196
DoPendReadyToWrite(Context &)197 Poll<Status> StreamChannel::DoPendReadyToWrite(Context&) { return OkStatus(); }
198
DoStageWrite(pw::multibuf::MultiBuf && data)199 pw::Status StreamChannel::DoStageWrite(pw::multibuf::MultiBuf&& data) {
200 PW_TRY(write_state_.SendData(std::move(data)));
201 return OkStatus();
202 }
203
204 } // namespace pw::channel
205