xref: /aosp_15_r20/external/pigweed/pw_stream/mpsc_stream.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2023 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_stream/mpsc_stream.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include <cstring>
18*61c4878aSAndroid Build Coastguard Worker #include <mutex>
19*61c4878aSAndroid Build Coastguard Worker 
20*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
21*61c4878aSAndroid Build Coastguard Worker 
22*61c4878aSAndroid Build Coastguard Worker namespace pw::stream {
23*61c4878aSAndroid Build Coastguard Worker namespace {
24*61c4878aSAndroid Build Coastguard Worker 
25*61c4878aSAndroid Build Coastguard Worker // Wait to receive a thread notification with an optional timeout.
Await(sync::TimedThreadNotification & notification,const std::optional<chrono::SystemClock::duration> & timeout)26*61c4878aSAndroid Build Coastguard Worker bool Await(sync::TimedThreadNotification& notification,
27*61c4878aSAndroid Build Coastguard Worker            const std::optional<chrono::SystemClock::duration>& timeout) {
28*61c4878aSAndroid Build Coastguard Worker   if (timeout.has_value()) {
29*61c4878aSAndroid Build Coastguard Worker     return notification.try_acquire_for(*timeout);
30*61c4878aSAndroid Build Coastguard Worker   }
31*61c4878aSAndroid Build Coastguard Worker   // Block indefinitely.
32*61c4878aSAndroid Build Coastguard Worker   notification.acquire();
33*61c4878aSAndroid Build Coastguard Worker   return true;
34*61c4878aSAndroid Build Coastguard Worker }
35*61c4878aSAndroid Build Coastguard Worker 
36*61c4878aSAndroid Build Coastguard Worker }  // namespace
37*61c4878aSAndroid Build Coastguard Worker 
CreateMpscStream(MpscReader & reader,MpscWriter & writer)38*61c4878aSAndroid Build Coastguard Worker void CreateMpscStream(MpscReader& reader, MpscWriter& writer) {
39*61c4878aSAndroid Build Coastguard Worker   reader.Close();
40*61c4878aSAndroid Build Coastguard Worker   std::lock_guard rlock(reader.mutex_);
41*61c4878aSAndroid Build Coastguard Worker   PW_CHECK(reader.writers_.empty());
42*61c4878aSAndroid Build Coastguard Worker   std::lock_guard wlock(writer.mutex_);
43*61c4878aSAndroid Build Coastguard Worker   writer.CloseLocked();
44*61c4878aSAndroid Build Coastguard Worker   reader.writers_.push_front(writer);
45*61c4878aSAndroid Build Coastguard Worker   reader.IncreaseLimitLocked(Stream::kUnlimited);
46*61c4878aSAndroid Build Coastguard Worker   writer.reader_ = &reader;
47*61c4878aSAndroid Build Coastguard Worker }
48*61c4878aSAndroid Build Coastguard Worker 
49*61c4878aSAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
50*61c4878aSAndroid Build Coastguard Worker // MpscWriter methods.
51*61c4878aSAndroid Build Coastguard Worker 
MpscWriter(const MpscWriter & other)52*61c4878aSAndroid Build Coastguard Worker MpscWriter::MpscWriter(const MpscWriter& other) : MpscWriter() {
53*61c4878aSAndroid Build Coastguard Worker   *this = other;
54*61c4878aSAndroid Build Coastguard Worker }
55*61c4878aSAndroid Build Coastguard Worker 
operator =(const MpscWriter & other)56*61c4878aSAndroid Build Coastguard Worker MpscWriter& MpscWriter::operator=(const MpscWriter& other) {
57*61c4878aSAndroid Build Coastguard Worker   Close();
58*61c4878aSAndroid Build Coastguard Worker 
59*61c4878aSAndroid Build Coastguard Worker   // Read the other object's internal state. Avoid holding both locks at once.
60*61c4878aSAndroid Build Coastguard Worker   other.mutex_.lock();
61*61c4878aSAndroid Build Coastguard Worker   MpscReader* reader = other.reader_;
62*61c4878aSAndroid Build Coastguard Worker   duration timeout = other.timeout_;
63*61c4878aSAndroid Build Coastguard Worker   size_t limit = other.limit_;
64*61c4878aSAndroid Build Coastguard Worker   size_t last_write = other.last_write_;
65*61c4878aSAndroid Build Coastguard Worker   other.mutex_.unlock();
66*61c4878aSAndroid Build Coastguard Worker 
67*61c4878aSAndroid Build Coastguard Worker   // Now update this object with the other's state.
68*61c4878aSAndroid Build Coastguard Worker   mutex_.lock();
69*61c4878aSAndroid Build Coastguard Worker   reader_ = reader;
70*61c4878aSAndroid Build Coastguard Worker   timeout_ = timeout;
71*61c4878aSAndroid Build Coastguard Worker   limit_ = limit;
72*61c4878aSAndroid Build Coastguard Worker   last_write_ = last_write;
73*61c4878aSAndroid Build Coastguard Worker   mutex_.unlock();
74*61c4878aSAndroid Build Coastguard Worker 
75*61c4878aSAndroid Build Coastguard Worker   // Add the writer to the reader outside the lock. If the reader was closed
76*61c4878aSAndroid Build Coastguard Worker   // concurrently, this will close the writer.
77*61c4878aSAndroid Build Coastguard Worker   if (reader != nullptr) {
78*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(reader->mutex_);
79*61c4878aSAndroid Build Coastguard Worker     reader->writers_.push_front(*this);
80*61c4878aSAndroid Build Coastguard Worker     reader->IncreaseLimitLocked(limit);
81*61c4878aSAndroid Build Coastguard Worker   }
82*61c4878aSAndroid Build Coastguard Worker   return *this;
83*61c4878aSAndroid Build Coastguard Worker }
84*61c4878aSAndroid Build Coastguard Worker 
MpscWriter(MpscWriter && other)85*61c4878aSAndroid Build Coastguard Worker MpscWriter::MpscWriter(MpscWriter&& other) : MpscWriter() {
86*61c4878aSAndroid Build Coastguard Worker   *this = std::move(other);
87*61c4878aSAndroid Build Coastguard Worker }
88*61c4878aSAndroid Build Coastguard Worker 
operator =(MpscWriter && other)89*61c4878aSAndroid Build Coastguard Worker MpscWriter& MpscWriter::operator=(MpscWriter&& other) {
90*61c4878aSAndroid Build Coastguard Worker   *this = other;
91*61c4878aSAndroid Build Coastguard Worker   other.Close();
92*61c4878aSAndroid Build Coastguard Worker   return *this;
93*61c4878aSAndroid Build Coastguard Worker }
94*61c4878aSAndroid Build Coastguard Worker 
~MpscWriter()95*61c4878aSAndroid Build Coastguard Worker MpscWriter::~MpscWriter() { Close(); }
96*61c4878aSAndroid Build Coastguard Worker 
connected() const97*61c4878aSAndroid Build Coastguard Worker bool MpscWriter::connected() const {
98*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
99*61c4878aSAndroid Build Coastguard Worker   return reader_ != nullptr;
100*61c4878aSAndroid Build Coastguard Worker }
101*61c4878aSAndroid Build Coastguard Worker 
last_write() const102*61c4878aSAndroid Build Coastguard Worker size_t MpscWriter::last_write() const {
103*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
104*61c4878aSAndroid Build Coastguard Worker   return last_write_;
105*61c4878aSAndroid Build Coastguard Worker }
106*61c4878aSAndroid Build Coastguard Worker 
SetTimeout(const duration & timeout)107*61c4878aSAndroid Build Coastguard Worker void MpscWriter::SetTimeout(const duration& timeout) {
108*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
109*61c4878aSAndroid Build Coastguard Worker   timeout_ = timeout;
110*61c4878aSAndroid Build Coastguard Worker }
111*61c4878aSAndroid Build Coastguard Worker 
SetLimit(size_t limit)112*61c4878aSAndroid Build Coastguard Worker void MpscWriter::SetLimit(size_t limit) {
113*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
114*61c4878aSAndroid Build Coastguard Worker   if (reader_) {
115*61c4878aSAndroid Build Coastguard Worker     reader_->DecreaseLimit(limit_);
116*61c4878aSAndroid Build Coastguard Worker     reader_->IncreaseLimit(limit);
117*61c4878aSAndroid Build Coastguard Worker   }
118*61c4878aSAndroid Build Coastguard Worker   limit_ = limit;
119*61c4878aSAndroid Build Coastguard Worker   if (limit_ == 0) {
120*61c4878aSAndroid Build Coastguard Worker     CloseLocked();
121*61c4878aSAndroid Build Coastguard Worker   }
122*61c4878aSAndroid Build Coastguard Worker }
123*61c4878aSAndroid Build Coastguard Worker 
ConservativeLimit(LimitType type) const124*61c4878aSAndroid Build Coastguard Worker size_t MpscWriter::ConservativeLimit(LimitType type) const {
125*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
126*61c4878aSAndroid Build Coastguard Worker   return reader_ != nullptr && type == LimitType::kWrite ? limit_ : 0;
127*61c4878aSAndroid Build Coastguard Worker }
128*61c4878aSAndroid Build Coastguard Worker 
DoWrite(ConstByteSpan data)129*61c4878aSAndroid Build Coastguard Worker Status MpscWriter::DoWrite(ConstByteSpan data) {
130*61c4878aSAndroid Build Coastguard Worker   // Check some conditions to see if an early exit is possible.
131*61c4878aSAndroid Build Coastguard Worker   if (data.empty()) {
132*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
133*61c4878aSAndroid Build Coastguard Worker   }
134*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
135*61c4878aSAndroid Build Coastguard Worker   if (reader_ == nullptr) {
136*61c4878aSAndroid Build Coastguard Worker     return Status::OutOfRange();
137*61c4878aSAndroid Build Coastguard Worker   }
138*61c4878aSAndroid Build Coastguard Worker   if (limit_ < data.size()) {
139*61c4878aSAndroid Build Coastguard Worker     return Status::ResourceExhausted();
140*61c4878aSAndroid Build Coastguard Worker   }
141*61c4878aSAndroid Build Coastguard Worker   if (!write_request_.unlisted()) {
142*61c4878aSAndroid Build Coastguard Worker     return Status::FailedPrecondition();
143*61c4878aSAndroid Build Coastguard Worker   }
144*61c4878aSAndroid Build Coastguard Worker   // Subscribe to the reader. This will enqueue this object's write request,
145*61c4878aSAndroid Build Coastguard Worker   // which will be used to notify the writer when the reader has space available
146*61c4878aSAndroid Build Coastguard Worker   // or has closed.
147*61c4878aSAndroid Build Coastguard Worker   reader_->RequestWrite(write_request_);
148*61c4878aSAndroid Build Coastguard Worker   last_write_ = 0;
149*61c4878aSAndroid Build Coastguard Worker 
150*61c4878aSAndroid Build Coastguard Worker   Status status;
151*61c4878aSAndroid Build Coastguard Worker   while (!data.empty()) {
152*61c4878aSAndroid Build Coastguard Worker     // Wait to be notified by the reader.
153*61c4878aSAndroid Build Coastguard Worker     // Note: This manually unlocks and relocks the mutex currently held by the
154*61c4878aSAndroid Build Coastguard Worker     // lock guard. It must not return while the mutex is not locked.
155*61c4878aSAndroid Build Coastguard Worker     duration timeout = timeout_;
156*61c4878aSAndroid Build Coastguard Worker     mutex_.unlock();
157*61c4878aSAndroid Build Coastguard Worker     bool writeable = Await(write_request_.notification, timeout);
158*61c4878aSAndroid Build Coastguard Worker     mutex_.lock();
159*61c4878aSAndroid Build Coastguard Worker 
160*61c4878aSAndroid Build Coastguard Worker     // Conditions may have changed while waiting; check again.
161*61c4878aSAndroid Build Coastguard Worker     if (reader_ == nullptr) {
162*61c4878aSAndroid Build Coastguard Worker       return Status::OutOfRange();
163*61c4878aSAndroid Build Coastguard Worker     }
164*61c4878aSAndroid Build Coastguard Worker     if (!writeable || limit_ < data.size()) {
165*61c4878aSAndroid Build Coastguard Worker       status = Status::ResourceExhausted();
166*61c4878aSAndroid Build Coastguard Worker       break;
167*61c4878aSAndroid Build Coastguard Worker     }
168*61c4878aSAndroid Build Coastguard Worker 
169*61c4878aSAndroid Build Coastguard Worker     // Attempt to write data.
170*61c4878aSAndroid Build Coastguard Worker     StatusWithSize result = reader_->WriteData(data, limit_);
171*61c4878aSAndroid Build Coastguard Worker     last_write_ += result.size();
172*61c4878aSAndroid Build Coastguard Worker     if (limit_ != kUnlimited) {
173*61c4878aSAndroid Build Coastguard Worker       limit_ -= result.size();
174*61c4878aSAndroid Build Coastguard Worker     }
175*61c4878aSAndroid Build Coastguard Worker 
176*61c4878aSAndroid Build Coastguard Worker     // WriteData() only returns an error if the reader is closed. In that case,
177*61c4878aSAndroid Build Coastguard Worker     // or if the writer has written all of its data, the writer should close.
178*61c4878aSAndroid Build Coastguard Worker     if (!result.ok() || limit_ == 0) {
179*61c4878aSAndroid Build Coastguard Worker       CloseLocked();
180*61c4878aSAndroid Build Coastguard Worker       return result.status();
181*61c4878aSAndroid Build Coastguard Worker     }
182*61c4878aSAndroid Build Coastguard Worker     data = data.subspan(result.size());
183*61c4878aSAndroid Build Coastguard Worker   }
184*61c4878aSAndroid Build Coastguard Worker 
185*61c4878aSAndroid Build Coastguard Worker   // Unsubscribe from the reader.
186*61c4878aSAndroid Build Coastguard Worker   reader_->CompleteWrite(write_request_);
187*61c4878aSAndroid Build Coastguard Worker   return status;
188*61c4878aSAndroid Build Coastguard Worker }
189*61c4878aSAndroid Build Coastguard Worker 
Close()190*61c4878aSAndroid Build Coastguard Worker void MpscWriter::Close() {
191*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
192*61c4878aSAndroid Build Coastguard Worker   CloseLocked();
193*61c4878aSAndroid Build Coastguard Worker }
194*61c4878aSAndroid Build Coastguard Worker 
CloseLocked()195*61c4878aSAndroid Build Coastguard Worker void MpscWriter::CloseLocked() {
196*61c4878aSAndroid Build Coastguard Worker   if (reader_ != nullptr) {
197*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(reader_->mutex_);
198*61c4878aSAndroid Build Coastguard Worker     reader_->CompleteWriteLocked(write_request_);
199*61c4878aSAndroid Build Coastguard Worker     write_request_.notification.release();
200*61c4878aSAndroid Build Coastguard Worker     if (reader_->writers_.remove(*this)) {
201*61c4878aSAndroid Build Coastguard Worker       reader_->DecreaseLimitLocked(limit_);
202*61c4878aSAndroid Build Coastguard Worker     }
203*61c4878aSAndroid Build Coastguard Worker     if (reader_->writers_.empty()) {
204*61c4878aSAndroid Build Coastguard Worker       reader_->readable_.release();
205*61c4878aSAndroid Build Coastguard Worker     }
206*61c4878aSAndroid Build Coastguard Worker     reader_ = nullptr;
207*61c4878aSAndroid Build Coastguard Worker   }
208*61c4878aSAndroid Build Coastguard Worker   limit_ = kUnlimited;
209*61c4878aSAndroid Build Coastguard Worker }
210*61c4878aSAndroid Build Coastguard Worker 
211*61c4878aSAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
212*61c4878aSAndroid Build Coastguard Worker // MpscReader methods.
213*61c4878aSAndroid Build Coastguard Worker 
MpscReader()214*61c4878aSAndroid Build Coastguard Worker MpscReader::MpscReader() { last_request_ = write_requests_.begin(); }
215*61c4878aSAndroid Build Coastguard Worker 
~MpscReader()216*61c4878aSAndroid Build Coastguard Worker MpscReader::~MpscReader() { Close(); }
217*61c4878aSAndroid Build Coastguard Worker 
connected() const218*61c4878aSAndroid Build Coastguard Worker bool MpscReader::connected() const {
219*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
220*61c4878aSAndroid Build Coastguard Worker   return !writers_.empty();
221*61c4878aSAndroid Build Coastguard Worker }
222*61c4878aSAndroid Build Coastguard Worker 
SetBuffer(ByteSpan buffer)223*61c4878aSAndroid Build Coastguard Worker void MpscReader::SetBuffer(ByteSpan buffer) {
224*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
225*61c4878aSAndroid Build Coastguard Worker   PW_CHECK(length_ == 0);
226*61c4878aSAndroid Build Coastguard Worker   buffer_ = buffer;
227*61c4878aSAndroid Build Coastguard Worker   offset_ = 0;
228*61c4878aSAndroid Build Coastguard Worker }
229*61c4878aSAndroid Build Coastguard Worker 
SetTimeout(const duration & timeout)230*61c4878aSAndroid Build Coastguard Worker void MpscReader::SetTimeout(const duration& timeout) {
231*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
232*61c4878aSAndroid Build Coastguard Worker   timeout_ = timeout;
233*61c4878aSAndroid Build Coastguard Worker }
234*61c4878aSAndroid Build Coastguard Worker 
IncreaseLimit(size_t delta)235*61c4878aSAndroid Build Coastguard Worker void MpscReader::IncreaseLimit(size_t delta) {
236*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
237*61c4878aSAndroid Build Coastguard Worker   IncreaseLimitLocked(delta);
238*61c4878aSAndroid Build Coastguard Worker }
239*61c4878aSAndroid Build Coastguard Worker 
IncreaseLimitLocked(size_t delta)240*61c4878aSAndroid Build Coastguard Worker void MpscReader::IncreaseLimitLocked(size_t delta) {
241*61c4878aSAndroid Build Coastguard Worker   if (delta == kUnlimited) {
242*61c4878aSAndroid Build Coastguard Worker     ++num_unlimited_;
243*61c4878aSAndroid Build Coastguard Worker     PW_CHECK_UINT_NE(num_unlimited_, 0);
244*61c4878aSAndroid Build Coastguard Worker   } else if (limit_ != kUnlimited) {
245*61c4878aSAndroid Build Coastguard Worker     PW_CHECK_UINT_LT(limit_, kUnlimited - delta);
246*61c4878aSAndroid Build Coastguard Worker     limit_ += delta;
247*61c4878aSAndroid Build Coastguard Worker   }
248*61c4878aSAndroid Build Coastguard Worker }
249*61c4878aSAndroid Build Coastguard Worker 
DecreaseLimit(size_t delta)250*61c4878aSAndroid Build Coastguard Worker void MpscReader::DecreaseLimit(size_t delta) {
251*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
252*61c4878aSAndroid Build Coastguard Worker   DecreaseLimitLocked(delta);
253*61c4878aSAndroid Build Coastguard Worker }
254*61c4878aSAndroid Build Coastguard Worker 
DecreaseLimitLocked(size_t delta)255*61c4878aSAndroid Build Coastguard Worker void MpscReader::DecreaseLimitLocked(size_t delta) {
256*61c4878aSAndroid Build Coastguard Worker   if (delta == kUnlimited) {
257*61c4878aSAndroid Build Coastguard Worker     PW_CHECK_UINT_NE(num_unlimited_, 0);
258*61c4878aSAndroid Build Coastguard Worker     --num_unlimited_;
259*61c4878aSAndroid Build Coastguard Worker   } else if (limit_ != kUnlimited) {
260*61c4878aSAndroid Build Coastguard Worker     PW_CHECK_UINT_LE(delta, limit_);
261*61c4878aSAndroid Build Coastguard Worker     limit_ -= delta;
262*61c4878aSAndroid Build Coastguard Worker   }
263*61c4878aSAndroid Build Coastguard Worker }
264*61c4878aSAndroid Build Coastguard Worker 
ConservativeLimit(LimitType type) const265*61c4878aSAndroid Build Coastguard Worker size_t MpscReader::ConservativeLimit(LimitType type) const {
266*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
267*61c4878aSAndroid Build Coastguard Worker   if (type != LimitType::kRead) {
268*61c4878aSAndroid Build Coastguard Worker     return 0;
269*61c4878aSAndroid Build Coastguard Worker   }
270*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty()) {
271*61c4878aSAndroid Build Coastguard Worker     return length_;
272*61c4878aSAndroid Build Coastguard Worker   }
273*61c4878aSAndroid Build Coastguard Worker   if (num_unlimited_ != 0) {
274*61c4878aSAndroid Build Coastguard Worker     return kUnlimited;
275*61c4878aSAndroid Build Coastguard Worker   }
276*61c4878aSAndroid Build Coastguard Worker   return limit_;
277*61c4878aSAndroid Build Coastguard Worker }
278*61c4878aSAndroid Build Coastguard Worker 
RequestWrite(MpscWriter::Request & write_request)279*61c4878aSAndroid Build Coastguard Worker void MpscReader::RequestWrite(MpscWriter::Request& write_request) {
280*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
281*61c4878aSAndroid Build Coastguard Worker   last_request_ = write_requests_.insert_after(last_request_, write_request);
282*61c4878aSAndroid Build Coastguard Worker   CheckWriteableLocked();
283*61c4878aSAndroid Build Coastguard Worker }
284*61c4878aSAndroid Build Coastguard Worker 
CheckWriteableLocked()285*61c4878aSAndroid Build Coastguard Worker void MpscReader::CheckWriteableLocked() {
286*61c4878aSAndroid Build Coastguard Worker   if (write_requests_.empty()) {
287*61c4878aSAndroid Build Coastguard Worker     return;
288*61c4878aSAndroid Build Coastguard Worker   }
289*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty() || written_ < destination_.size() ||
290*61c4878aSAndroid Build Coastguard Worker       length_ < buffer_.size()) {
291*61c4878aSAndroid Build Coastguard Worker     MpscWriter::Request& write_request = write_requests_.front();
292*61c4878aSAndroid Build Coastguard Worker     write_request.notification.release();
293*61c4878aSAndroid Build Coastguard Worker   }
294*61c4878aSAndroid Build Coastguard Worker }
295*61c4878aSAndroid Build Coastguard Worker 
WriteData(ConstByteSpan data,size_t limit)296*61c4878aSAndroid Build Coastguard Worker StatusWithSize MpscReader::WriteData(ConstByteSpan data, size_t limit) {
297*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
298*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty()) {
299*61c4878aSAndroid Build Coastguard Worker     return StatusWithSize::OutOfRange(0);
300*61c4878aSAndroid Build Coastguard Worker   }
301*61c4878aSAndroid Build Coastguard Worker   size_t length = 0;
302*61c4878aSAndroid Build Coastguard Worker   size_t available = buffer_.size() - length_;
303*61c4878aSAndroid Build Coastguard Worker   if (written_ < destination_.size()) {
304*61c4878aSAndroid Build Coastguard Worker     // A read is pending; copy directly into its buffer.
305*61c4878aSAndroid Build Coastguard Worker     // Note: this condition is only true when the buffer is empty, so data
306*61c4878aSAndroid Build Coastguard Worker     // order is preserved.
307*61c4878aSAndroid Build Coastguard Worker     length = std::min(destination_.size() - written_, data.size());
308*61c4878aSAndroid Build Coastguard Worker     memcpy(&destination_[written_], &data[0], length);
309*61c4878aSAndroid Build Coastguard Worker     written_ += length;
310*61c4878aSAndroid Build Coastguard Worker   } else if (available > 0) {
311*61c4878aSAndroid Build Coastguard Worker     // The buffer has space for more data.
312*61c4878aSAndroid Build Coastguard Worker     length = std::min(available, data.size());
313*61c4878aSAndroid Build Coastguard Worker     size_t offset = (offset_ + length_) % buffer_.size();
314*61c4878aSAndroid Build Coastguard Worker     size_t contiguous = buffer_.size() - offset;
315*61c4878aSAndroid Build Coastguard Worker     if (length <= contiguous) {
316*61c4878aSAndroid Build Coastguard Worker       memcpy(&buffer_[offset], &data[0], length);
317*61c4878aSAndroid Build Coastguard Worker     } else {
318*61c4878aSAndroid Build Coastguard Worker       memcpy(&buffer_[offset], &data[0], contiguous);
319*61c4878aSAndroid Build Coastguard Worker       memcpy(&buffer_[0], &data[contiguous], length - contiguous);
320*61c4878aSAndroid Build Coastguard Worker     }
321*61c4878aSAndroid Build Coastguard Worker     length_ += length;
322*61c4878aSAndroid Build Coastguard Worker   } else {
323*61c4878aSAndroid Build Coastguard Worker     // If there is no space available, a write request can only be notified when
324*61c4878aSAndroid Build Coastguard Worker     // its writer is closing. Do not notify the reader that data is available.
325*61c4878aSAndroid Build Coastguard Worker     return StatusWithSize(0);
326*61c4878aSAndroid Build Coastguard Worker   }
327*61c4878aSAndroid Build Coastguard Worker   data = data.subspan(length);
328*61c4878aSAndroid Build Coastguard Worker   // For unlimited writers, increase the read limit as needed.
329*61c4878aSAndroid Build Coastguard Worker   // Do this before waking the reader and releasing the lock.
330*61c4878aSAndroid Build Coastguard Worker   if (limit == kUnlimited) {
331*61c4878aSAndroid Build Coastguard Worker     IncreaseLimitLocked(length);
332*61c4878aSAndroid Build Coastguard Worker   }
333*61c4878aSAndroid Build Coastguard Worker   readable_.release();
334*61c4878aSAndroid Build Coastguard Worker   return StatusWithSize(length);
335*61c4878aSAndroid Build Coastguard Worker }
336*61c4878aSAndroid Build Coastguard Worker 
CompleteWrite(MpscWriter::Request & write_request)337*61c4878aSAndroid Build Coastguard Worker void MpscReader::CompleteWrite(MpscWriter::Request& write_request) {
338*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(mutex_);
339*61c4878aSAndroid Build Coastguard Worker   CompleteWriteLocked(write_request);
340*61c4878aSAndroid Build Coastguard Worker }
341*61c4878aSAndroid Build Coastguard Worker 
CompleteWriteLocked(MpscWriter::Request & write_request)342*61c4878aSAndroid Build Coastguard Worker void MpscReader::CompleteWriteLocked(MpscWriter::Request& write_request) {
343*61c4878aSAndroid Build Coastguard Worker   MpscWriter::Request& last_request = *last_request_;
344*61c4878aSAndroid Build Coastguard Worker   write_requests_.remove(write_request);
345*61c4878aSAndroid Build Coastguard Worker 
346*61c4878aSAndroid Build Coastguard Worker   // If the last request is removed, find the new last request. This is O(n),
347*61c4878aSAndroid Build Coastguard Worker   // but the oremoved element is first unless a request is being canceled due to
348*61c4878aSAndroid Build Coastguard Worker   // its writer closing. Thus in the typical case of a successful write, this is
349*61c4878aSAndroid Build Coastguard Worker   // O(1).
350*61c4878aSAndroid Build Coastguard Worker   if (&last_request == &write_request) {
351*61c4878aSAndroid Build Coastguard Worker     last_request_ = write_requests_.begin();
352*61c4878aSAndroid Build Coastguard Worker     for (size_t i = 1; i < write_requests_.size(); ++i) {
353*61c4878aSAndroid Build Coastguard Worker       ++last_request_;
354*61c4878aSAndroid Build Coastguard Worker     }
355*61c4878aSAndroid Build Coastguard Worker   }
356*61c4878aSAndroid Build Coastguard Worker 
357*61c4878aSAndroid Build Coastguard Worker   // The reader may have signaled this writer that it had space between the last
358*61c4878aSAndroid Build Coastguard Worker   // call to WriteData() and this call. Check if that signal should be forwarded
359*61c4878aSAndroid Build Coastguard Worker   // to the next write request.
360*61c4878aSAndroid Build Coastguard Worker   CheckWriteableLocked();
361*61c4878aSAndroid Build Coastguard Worker }
362*61c4878aSAndroid Build Coastguard Worker 
DoRead(ByteSpan destination)363*61c4878aSAndroid Build Coastguard Worker StatusWithSize MpscReader::DoRead(ByteSpan destination) {
364*61c4878aSAndroid Build Coastguard Worker   if (destination.empty()) {
365*61c4878aSAndroid Build Coastguard Worker     return StatusWithSize(0);
366*61c4878aSAndroid Build Coastguard Worker   }
367*61c4878aSAndroid Build Coastguard Worker   mutex_.lock();
368*61c4878aSAndroid Build Coastguard Worker   PW_CHECK(!reading_, "All reads must happen from the same thread.");
369*61c4878aSAndroid Build Coastguard Worker   reading_ = true;
370*61c4878aSAndroid Build Coastguard Worker   Status status = OkStatus();
371*61c4878aSAndroid Build Coastguard Worker   size_t length = 0;
372*61c4878aSAndroid Build Coastguard Worker 
373*61c4878aSAndroid Build Coastguard Worker   // Check for buffered data. Do this before checking if the reader is still
374*61c4878aSAndroid Build Coastguard Worker   // connected in order to deliver data sent from a now-closed writer.
375*61c4878aSAndroid Build Coastguard Worker   if (length_ != 0) {
376*61c4878aSAndroid Build Coastguard Worker     length = std::min(length_, destination.size());
377*61c4878aSAndroid Build Coastguard Worker     size_t contiguous = buffer_.size() - offset_;
378*61c4878aSAndroid Build Coastguard Worker     if (length < contiguous) {
379*61c4878aSAndroid Build Coastguard Worker       memcpy(&destination[0], &buffer_[offset_], length);
380*61c4878aSAndroid Build Coastguard Worker       offset_ += length;
381*61c4878aSAndroid Build Coastguard Worker     } else if (length == contiguous) {
382*61c4878aSAndroid Build Coastguard Worker       memcpy(&destination[0], &buffer_[offset_], length);
383*61c4878aSAndroid Build Coastguard Worker       offset_ = 0;
384*61c4878aSAndroid Build Coastguard Worker     } else {
385*61c4878aSAndroid Build Coastguard Worker       memcpy(&destination[0], &buffer_[offset_], contiguous);
386*61c4878aSAndroid Build Coastguard Worker       offset_ = length - contiguous;
387*61c4878aSAndroid Build Coastguard Worker       memcpy(&destination[contiguous], &buffer_[0], offset_);
388*61c4878aSAndroid Build Coastguard Worker     }
389*61c4878aSAndroid Build Coastguard Worker     length_ -= length;
390*61c4878aSAndroid Build Coastguard Worker     DecreaseLimitLocked(length);
391*61c4878aSAndroid Build Coastguard Worker     CheckWriteableLocked();
392*61c4878aSAndroid Build Coastguard Worker 
393*61c4878aSAndroid Build Coastguard Worker   } else {
394*61c4878aSAndroid Build Coastguard Worker     // Register the output buffer to and wait for Write() to bypass the buffer
395*61c4878aSAndroid Build Coastguard Worker     // and write directly into it. Note that the buffer is only bypassed when
396*61c4878aSAndroid Build Coastguard Worker     // empty, so data order is preserved.
397*61c4878aSAndroid Build Coastguard Worker     PW_CHECK(written_ == 0);
398*61c4878aSAndroid Build Coastguard Worker     destination_ = destination;
399*61c4878aSAndroid Build Coastguard Worker     CheckWriteableLocked();
400*61c4878aSAndroid Build Coastguard Worker 
401*61c4878aSAndroid Build Coastguard Worker     // The reader state may change while waiting, or even between acquiring the
402*61c4878aSAndroid Build Coastguard Worker     // notification and acquiring the lock. As an example, the following
403*61c4878aSAndroid Build Coastguard Worker     // sequence of events is possible:
404*61c4878aSAndroid Build Coastguard Worker     //
405*61c4878aSAndroid Build Coastguard Worker     //   1. A writer partially fills the output buffer and releases the
406*61c4878aSAndroid Build Coastguard Worker     //      notification.
407*61c4878aSAndroid Build Coastguard Worker     //   2. The reader acquires the notification.
408*61c4878aSAndroid Build Coastguard Worker     //   3. Another writer fills the remainder of the buffer and releass the
409*61c4878aSAndroid Build Coastguard Worker     //      notification *again*.
410*61c4878aSAndroid Build Coastguard Worker     //   4. The reader acquires the lock.
411*61c4878aSAndroid Build Coastguard Worker     //
412*61c4878aSAndroid Build Coastguard Worker     // In this case, on the *next* read, the notification will be acquired
413*61c4878aSAndroid Build Coastguard Worker     // immediately even if no data is available. As a result, this code loops
414*61c4878aSAndroid Build Coastguard Worker     // until data is available.
415*61c4878aSAndroid Build Coastguard Worker     while (status.ok()) {
416*61c4878aSAndroid Build Coastguard Worker       bool readable = true;
417*61c4878aSAndroid Build Coastguard Worker       if (!writers_.empty()) {
418*61c4878aSAndroid Build Coastguard Worker         // Wait for a writer to provide data, or the reader to be closed.
419*61c4878aSAndroid Build Coastguard Worker         duration timeout = timeout_;
420*61c4878aSAndroid Build Coastguard Worker         mutex_.unlock();
421*61c4878aSAndroid Build Coastguard Worker         readable = Await(readable_, timeout);
422*61c4878aSAndroid Build Coastguard Worker         mutex_.lock();
423*61c4878aSAndroid Build Coastguard Worker       }
424*61c4878aSAndroid Build Coastguard Worker       if (!readable) {
425*61c4878aSAndroid Build Coastguard Worker         status = Status::ResourceExhausted();
426*61c4878aSAndroid Build Coastguard Worker       } else if (written_ != 0) {
427*61c4878aSAndroid Build Coastguard Worker         break;
428*61c4878aSAndroid Build Coastguard Worker       } else if (writers_.empty()) {
429*61c4878aSAndroid Build Coastguard Worker         status = Status::OutOfRange();
430*61c4878aSAndroid Build Coastguard Worker       }
431*61c4878aSAndroid Build Coastguard Worker     }
432*61c4878aSAndroid Build Coastguard Worker     destination_ = ByteSpan();
433*61c4878aSAndroid Build Coastguard Worker     length = written_;
434*61c4878aSAndroid Build Coastguard Worker     written_ = 0;
435*61c4878aSAndroid Build Coastguard Worker     DecreaseLimitLocked(length);
436*61c4878aSAndroid Build Coastguard Worker     CheckWriteableLocked();
437*61c4878aSAndroid Build Coastguard Worker   }
438*61c4878aSAndroid Build Coastguard Worker 
439*61c4878aSAndroid Build Coastguard Worker   reading_ = false;
440*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty()) {
441*61c4878aSAndroid Build Coastguard Worker     closeable_.release();
442*61c4878aSAndroid Build Coastguard Worker   }
443*61c4878aSAndroid Build Coastguard Worker   mutex_.unlock();
444*61c4878aSAndroid Build Coastguard Worker   return StatusWithSize(status, length);
445*61c4878aSAndroid Build Coastguard Worker }
446*61c4878aSAndroid Build Coastguard Worker 
ReadAll(ReadAllCallback callback)447*61c4878aSAndroid Build Coastguard Worker Status MpscReader::ReadAll(ReadAllCallback callback) {
448*61c4878aSAndroid Build Coastguard Worker   mutex_.lock();
449*61c4878aSAndroid Build Coastguard Worker   if (buffer_.empty()) {
450*61c4878aSAndroid Build Coastguard Worker     mutex_.unlock();
451*61c4878aSAndroid Build Coastguard Worker     return Status::FailedPrecondition();
452*61c4878aSAndroid Build Coastguard Worker   }
453*61c4878aSAndroid Build Coastguard Worker   PW_CHECK(!reading_, "All reads must happen from the same thread.");
454*61c4878aSAndroid Build Coastguard Worker   reading_ = true;
455*61c4878aSAndroid Build Coastguard Worker 
456*61c4878aSAndroid Build Coastguard Worker   Status status = Status::OutOfRange();
457*61c4878aSAndroid Build Coastguard Worker   while (true) {
458*61c4878aSAndroid Build Coastguard Worker     // Check for buffered data. Do this before checking if the reader still has
459*61c4878aSAndroid Build Coastguard Worker     // writers in order to deliver data sent from a now-closed writer.
460*61c4878aSAndroid Build Coastguard Worker     if (length_ != 0) {
461*61c4878aSAndroid Build Coastguard Worker       size_t length = std::min(buffer_.size() - offset_, length_);
462*61c4878aSAndroid Build Coastguard Worker       ConstByteSpan data(&buffer_[offset_], length);
463*61c4878aSAndroid Build Coastguard Worker       offset_ = (offset_ + length_) % buffer_.size();
464*61c4878aSAndroid Build Coastguard Worker       length_ -= length;
465*61c4878aSAndroid Build Coastguard Worker       DecreaseLimitLocked(data.size());
466*61c4878aSAndroid Build Coastguard Worker       CheckWriteableLocked();
467*61c4878aSAndroid Build Coastguard Worker       status = callback(data);
468*61c4878aSAndroid Build Coastguard Worker       if (!status.ok()) {
469*61c4878aSAndroid Build Coastguard Worker         break;
470*61c4878aSAndroid Build Coastguard Worker       }
471*61c4878aSAndroid Build Coastguard Worker     }
472*61c4878aSAndroid Build Coastguard Worker     if (writers_.empty()) {
473*61c4878aSAndroid Build Coastguard Worker       break;
474*61c4878aSAndroid Build Coastguard Worker     }
475*61c4878aSAndroid Build Coastguard Worker     // Wait for a writer to provide data.
476*61c4878aSAndroid Build Coastguard Worker     duration timeout = timeout_;
477*61c4878aSAndroid Build Coastguard Worker     mutex_.unlock();
478*61c4878aSAndroid Build Coastguard Worker     bool readable = Await(readable_, timeout);
479*61c4878aSAndroid Build Coastguard Worker     mutex_.lock();
480*61c4878aSAndroid Build Coastguard Worker     if (!readable) {
481*61c4878aSAndroid Build Coastguard Worker       status = Status::ResourceExhausted();
482*61c4878aSAndroid Build Coastguard Worker       break;
483*61c4878aSAndroid Build Coastguard Worker     }
484*61c4878aSAndroid Build Coastguard Worker   }
485*61c4878aSAndroid Build Coastguard Worker   reading_ = false;
486*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty()) {
487*61c4878aSAndroid Build Coastguard Worker     closeable_.release();
488*61c4878aSAndroid Build Coastguard Worker   }
489*61c4878aSAndroid Build Coastguard Worker   mutex_.unlock();
490*61c4878aSAndroid Build Coastguard Worker   return status;
491*61c4878aSAndroid Build Coastguard Worker }
492*61c4878aSAndroid Build Coastguard Worker 
Close()493*61c4878aSAndroid Build Coastguard Worker void MpscReader::Close() {
494*61c4878aSAndroid Build Coastguard Worker   mutex_.lock();
495*61c4878aSAndroid Build Coastguard Worker   if (writers_.empty()) {
496*61c4878aSAndroid Build Coastguard Worker     mutex_.unlock();
497*61c4878aSAndroid Build Coastguard Worker     return;
498*61c4878aSAndroid Build Coastguard Worker   }
499*61c4878aSAndroid Build Coastguard Worker   IntrusiveList<MpscWriter> writers;
500*61c4878aSAndroid Build Coastguard Worker   while (!writers_.empty()) {
501*61c4878aSAndroid Build Coastguard Worker     MpscWriter& writer = writers_.front();
502*61c4878aSAndroid Build Coastguard Worker     writers_.pop_front();
503*61c4878aSAndroid Build Coastguard Worker     writers.push_front(writer);
504*61c4878aSAndroid Build Coastguard Worker   }
505*61c4878aSAndroid Build Coastguard Worker 
506*61c4878aSAndroid Build Coastguard Worker   // Wait for any pending read to finish.
507*61c4878aSAndroid Build Coastguard Worker   if (reading_) {
508*61c4878aSAndroid Build Coastguard Worker     mutex_.unlock();
509*61c4878aSAndroid Build Coastguard Worker     readable_.release();
510*61c4878aSAndroid Build Coastguard Worker     closeable_.acquire();
511*61c4878aSAndroid Build Coastguard Worker     mutex_.lock();
512*61c4878aSAndroid Build Coastguard Worker   }
513*61c4878aSAndroid Build Coastguard Worker 
514*61c4878aSAndroid Build Coastguard Worker   num_unlimited_ = 0;
515*61c4878aSAndroid Build Coastguard Worker   limit_ = 0;
516*61c4878aSAndroid Build Coastguard Worker   written_ = 0;
517*61c4878aSAndroid Build Coastguard Worker   offset_ = 0;
518*61c4878aSAndroid Build Coastguard Worker   length_ = 0;
519*61c4878aSAndroid Build Coastguard Worker   mutex_.unlock();
520*61c4878aSAndroid Build Coastguard Worker 
521*61c4878aSAndroid Build Coastguard Worker   for (auto& writer : writers) {
522*61c4878aSAndroid Build Coastguard Worker     writer.Close();
523*61c4878aSAndroid Build Coastguard Worker   }
524*61c4878aSAndroid Build Coastguard Worker }
525*61c4878aSAndroid Build Coastguard Worker 
526*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::stream
527