xref: /aosp_15_r20/external/pigweed/pw_stream/public/pw_stream/mpsc_stream.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 /// @file
17 /// This file defines types related to a multi-producer, single-consumer stream.
18 ///
19 /// The single readers must be constructed in place, while writers can be moved.
20 /// A reader and writer may be connected using `CreateMpscStream()`. Additional
21 /// writers may be connected by copying a previously connected writer.
22 ///
23 /// Example:
24 ///
25 /// @code{.cpp}
26 ///    ...
27 ///    MpscReader reader;
28 ///    MpscWriter writer;
29 ///    CreateMpscStream(reader, writer);
30 ///    pw::Thread t(MakeThreadOptions(), [&writer] {
31 ///        Status status = writer.Write(GenerateSomeData());
32 ///        ...
33 ///    });
34 ///    std::byte buffer[kBufSize];
35 ///    if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) {
36 ///      ProcessSomeData(buffer);
37 ///    }
38 /// @endcode
39 ///
40 /// See the `MpscReader::ReadAll()` for additional examples.
41 ///
42 /// The types in the files are designed to be used across different threads,
43 /// but are not completely thread-safe. Data must only be written by an
44 /// MpscWriter using a single thread, and data must only be read by an
45 /// MpscReader using a single thread. In other words, multiple calls to
46 /// `Write()` must not be made concurrently, and multiple calls to `Read()` and
47 /// `ReadAll()` must not be made concurrently. Calls to other methods, e.g.
48 /// `Close()`, are thread-safe and may be made from any thread.
49 
50 #include <cstddef>
51 
52 #include "pw_bytes/span.h"
53 #include "pw_chrono/system_clock.h"
54 #include "pw_containers/intrusive_list.h"
55 #include "pw_function/function.h"
56 #include "pw_status/status.h"
57 #include "pw_status/status_with_size.h"
58 #include "pw_stream/stream.h"
59 #include "pw_sync/lock_annotations.h"
60 #include "pw_sync/mutex.h"
61 #include "pw_sync/timed_thread_notification.h"
62 
63 namespace pw::stream {
64 
65 // Forward declaration.
66 class MpscReader;
67 class MpscWriter;
68 
69 /// Creates a multi-producer, single consumer stream.
70 ///
71 /// This method creates a stream by associating a reader and writer. Both are
72 /// reset before being connected. This is the only way to connect a reader.
73 /// Additional writers may be connected by copying the given writer after it is
74 /// connected.
75 ///
76 /// This method is thread-safe with respect to other MpscReader and MpscWriter
77 /// methods. It is not thread-safe with respect to itself, i.e. callers must
78 /// not make concurrent calls to `CreateMpscStream()` from different threads
79 /// with the same objects.
80 ///
81 /// @param[out]   reader  The reader to connect.
82 /// @param[out]   writer  The writer to connect.
83 void CreateMpscStream(MpscReader& reader, MpscWriter& writer);
84 
85 /// Writer for a multi-producer, single consumer stream.
86 ///
87 /// This class has a default constructor that only produces disconnected
88 /// writers. To connect writers, use `CreateMpscStream()`. Additional connected
89 /// writers can be created by copying an existing one.
90 ///
91 /// Each thread should have its own dedicated writer. This class is thread-safe
92 /// with respect to the reader, but not with respect to itself. In particular,
93 /// attempting to call `Write()` concurrently on different threads may cause
94 /// result in a failure.
95 class MpscWriter : public NonSeekableWriter,
96                    public IntrusiveList<MpscWriter>::Item {
97  public:
98   using duration = std::optional<chrono::SystemClock::duration>;
99 
100   /// A per-writer thread notification that can be added to a reader's list.
101   ///
102   /// The reader maintains a list of outstanding requests to write data. As
103   /// data is read, and space to write data becomes available, it uses these
104   /// requests to signal the waiting the writers.
105   struct Request : public IntrusiveList<Request>::Item {
106     sync::TimedThreadNotification notification;
107     using IntrusiveList<Request>::Item::unlisted;
108   };
109 
110   MpscWriter() = default;
111   MpscWriter(const MpscWriter& other);
112   MpscWriter& operator=(const MpscWriter& other);
113   MpscWriter(MpscWriter&& other);
114   MpscWriter& operator=(MpscWriter&& other);
115   ~MpscWriter() override;
116 
117   /// Returns whether this object is connected to a reader.
118   bool connected() const PW_LOCKS_EXCLUDED(mutex_);
119 
120   /// Indicates how much data was sent in the last call to `Write()`.
121   size_t last_write() const PW_LOCKS_EXCLUDED(mutex_);
122 
123   /// Returns the optional maximum time elapsed before a `Write()` fails.
124   const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_);
125 
126   /// Set the timeout for writing to this stream.
127   ///
128   /// After setting a timeout, if the given duration elapses while making a call
129   /// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired,
130   /// a timeout should be set before calling `Write()`. Setting a timeout when a
131   /// writer is awaiting notification from a reader will not affect the duration
132   /// of that wait.
133   ///
134   /// Note that setting a write timeout makes partial writes possible. For
135   /// example, if a call to `Write()` of some length corresponds to 2 calls to
136   /// `Read()` of half that length with an sufficient delay between the calls
137   /// will result in the first half being written and read, but not the second.
138   /// This differs from `Stream::Write()` which stipulates that no data is
139   /// written on failure. If this happens, the length of the data written can be
140   /// retrieved using `last_write()`.
141   ///
142   /// Generally, callers should use one of three approaches:
143   ///  1. Do not set a write timeout, and let writers block arbitrarily long
144   ///     until space is available or the reader is disconnected.
145   ///  2. Use only a single writer, and use `last_write()` to resend data.
146   ///  3. Structure the data being sent so that the reader can always read
147   ///     complete messages and avoid blocking or performing complex work
148   ///     mid-message.
149   ///
150   /// @param[in]  timeout   The duration to wait before returning an error.
151   void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
152 
153   /// Sets the maximum amount that can be written by this writer.
154   ///
155   /// By default, writers can write an unlimited amount of data. This method can
156   /// be used to set a limit, or remove it by providing a value of
157   /// Stream::kUnlimited.
158   ///
159   /// If a limit is set, the writer will automatically close once it has written
160   /// that much data. The current number of bytes remaining until the limit is
161   /// reached can be retrieved using `ConservativeWriteLimit()`.
162   ///
163   /// @param[in]  limit   The maximum amount that can be written by this writer.
164   void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_);
165 
166   /// Disconnects this writer from its reader.
167   ///
168   /// This method does nothing if the writer is not connected.
169   void Close() PW_LOCKS_EXCLUDED(mutex_);
170 
171  private:
172   // The factory method is allowed to directly modify a writer to connect it
173   // to the reader.
174   friend void CreateMpscStream(MpscReader&, MpscWriter&);
175 
176   /// @copydoc Stream::ConservativeLimit
177   size_t ConservativeLimit(LimitType type) const override;
178 
179   /// @copydoc Stream::DoWrite
180   ///
181   /// This method is *not* thread-safe with respect to itself. If multiple
182   /// threads attempt to write concurrently using the same writer, those calls
183   /// may fail. Instead, each thread should have its own writer.
184   ///
185   /// @pre No other thread has called `Write()` on this object.
186   Status DoWrite(ConstByteSpan data) override;
187 
188   /// Locked implementation of `Close()`.
189   void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
190 
191   mutable sync::Mutex mutex_;
192   MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr;
193   size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited;
194   Request write_request_;
195   duration timeout_ PW_GUARDED_BY(mutex_);
196   size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
197 };
198 
199 /// Reader of a multi-producer, single-consumer stream.
200 ///
201 /// The reader manages 3 aspects of the stream:
202 ///   * The storage used to hold written data that is to be read.
203 ///   * The list of connected writers.
204 ///   * Accounting for how much data has and can be written.
205 ///
206 /// This class has a default constructor that can only produce a disconnected
207 /// reader. To connect a reader, use `CreateMpscStream()`.
208 class MpscReader : public NonSeekableReader {
209  public:
210   using duration = std::optional<chrono::SystemClock::duration>;
211 
212   MpscReader();
213   ~MpscReader() override;
214 
215   /// Returns whether this object has any connected writers.
216   bool connected() const PW_LOCKS_EXCLUDED(mutex_);
217 
218   /// Set the timeout for reading from this stream.
219   ///
220   /// After setting a timeout, if the given duration elapses while making a call
221   /// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout
222   /// should be set before calling `Read()` or `ReadAll()`. Setting a timeout
223   /// when a reader is awaiting notification from a writer will not affect the
224   /// duration of that wait. `ReadUntilClose()` ignores timeouts entirely.
225   ///
226   /// @param[in]  timeout   The duration to wait before returning an error.
227   void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
228 
229   /// Associates the reader with storage to buffer written data to be read.
230   ///
231   /// If desired, callers can use this method to buffer written data. This can
232   /// improve writer performance by allowing calls to `WriteData()` to avoid
233   /// waiting for the reader, albeit at the cost of increased memory. This can
234   /// be useful when the reader needs time to process the data it reads, or when
235   /// the volume of writes varies over time, i.e. is "bursty".
236   ///
237   /// The reader does not take ownership of the storage, which must be valid
238   /// until a call to the destructor or another call to `SetBuffer()`.
239   ///
240   /// @param[in]  buffer  A view to the storage.
241   void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_);
242 
243   /// @fn ReadAll
244   /// Reads data in a loop and passes it to a provided callback.
245   ///
246   /// This will read continuously until all connected writers close.
247   ///
248   /// Example usage:
249   ///
250   /// @code(.cpp}
251   ///    MpscReader reader;
252   ///    MpscWriter writer;
253   ///    MpscStreamCreate(reader, writer);
254   ///    Thread t(MakeThreadOptions(), [] (void*arg) {
255   ///      auto *writer = static_cast<MpscWriter *>(arg);
256   ///      writer->Write(GenerateSomeData()).IgnoreError();
257   ///    }, &writer);
258   ///    auto status = reader.ReadAll([] (ConstByteSpan data) {
259   ///      return ProcessSomeData();
260   ///    });
261   ///    t.join();
262   /// @endcode
263   ///
264   /// @param[in]  callback  A callable object to invoke on data as it is read.
265   /// @retval     OK                  Successfully read until writers closed.
266   /// @retval     FAILED_PRECONDITION The object does not have a buffer.
267   /// @retval     RESOURCE_EXHAUSTED  Timed out when reading data. This can only
268   ///                                 occur if a timeout has been set.
269   /// @retval     Any other error as returned by the callback.
270   using ReadAllCallback = Function<Status(ConstByteSpan data)>;
271   Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_);
272 
273   /// Disconnects all writers and drops any unread data.
274   void Close() PW_LOCKS_EXCLUDED(mutex_);
275 
276  private:
277   // The factory method is allowed to directly modify the reader to connect it
278   // to a writer.
279   friend void CreateMpscStream(MpscReader&, MpscWriter&);
280 
281   // The writer is allowed to call directly into the reader to:
282   //  * Add/remove itself to the reader's list of writer.
283   //  * Request space to write data, and to write to that space.
284   friend class MpscWriter;
285 
286   /// @fn IncreaseLimit
287   /// @fn IncreaseLimitLocked
288   /// Increases the number of remaining bytes to be written.
289   ///
290   /// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`.
291   ///
292   /// @param[in]  delta   How much to increase the number of remaining bytes.
293   void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
294   void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
295 
296   /// @fn DecreaseLimit
297   /// @fn DecreaseLimitLocked
298   /// Decreases the number of remaining bytes to be written.
299   ///
300   /// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`.
301   ///
302   /// @param[in]  delta   How much to decrease the number of remaining bytes.
303   void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
304   void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
305 
306   /// @copydoc Stream::ConservativeLimit
307   size_t ConservativeLimit(Stream::LimitType type) const override
308       PW_LOCKS_EXCLUDED(mutex_);
309 
310   /// Adds the write request to the reader's list of pending requests.
311   ///
312   /// Used by `MpscWriter::WriteData()`.
313   ///
314   /// @param[in]  write_request   A writer's request object.
315   void RequestWrite(MpscWriter::Request& write_request)
316       PW_LOCKS_EXCLUDED(mutex_);
317 
318   /// Checks if a writer can write data, and signals it if so.
319   ///
320   /// A reader may signal a writer because:
321   ///   * Space to write data has become available.
322   ///   * The queue of write requests has changed.
323   ///   * The reader is closing. `WriteData()` will return OUT_OF_RANGE.
324   void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
325 
326   /// Adds data from a writer to the buffer to be read.
327   ///
328   /// @param[in]  data            The data to be written.
329   /// @param[in]  limit           The writer's current write limit.
330   ///
331   /// @retval OK                  Data was written to the buffer.
332   /// @retval RESOURCE_EXHAUSTED  Buffer has insufficent space for data.
333   /// @retval OUT_OF_RANGE        Stream is shut down or closed.
334   StatusWithSize WriteData(ConstByteSpan data, size_t limit)
335       PW_LOCKS_EXCLUDED(mutex_);
336 
337   /// @fn CompleteWrite
338   /// @fn CompleteWriteLocked
339   /// Removes the write request from the reader's list of pending requests.
340   ///
341   /// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`.
342   ///
343   /// @param[in]  write_request   A writer's request object.
344   void CompleteWrite(MpscWriter::Request& write_request)
345       PW_LOCKS_EXCLUDED(mutex_);
346   void CompleteWriteLocked(MpscWriter::Request& write_request)
347       PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
348 
349   /// @copydoc Stream::DoRead
350   StatusWithSize DoRead(ByteSpan destination) override
351       PW_LOCKS_EXCLUDED(mutex_);
352 
353   // Locked implementations.
354 
355   mutable sync::Mutex mutex_;
356   IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_);
357   IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_);
358   IntrusiveList<MpscWriter::Request>::iterator last_request_
359       PW_GUARDED_BY(mutex_);
360 
361   size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0;
362   size_t limit_ PW_GUARDED_BY(mutex_) = 0;
363 
364   bool reading_ PW_GUARDED_BY(mutex_) = false;
365   sync::TimedThreadNotification readable_;
366   sync::ThreadNotification closeable_;
367   duration timeout_ PW_GUARDED_BY(mutex_);
368 
369   ByteSpan destination_ PW_GUARDED_BY(mutex_);
370   size_t written_ PW_GUARDED_BY(mutex_) = 0;
371 
372   ByteSpan buffer_ PW_GUARDED_BY(mutex_);
373   size_t offset_ PW_GUARDED_BY(mutex_) = 0;
374   size_t length_ PW_GUARDED_BY(mutex_) = 0;
375 };
376 
377 /// Reader for a multi-producer, single consumer stream.
378 ///
379 /// This class includes an explicitly-sized buffer. It has a default constructor
380 /// that can only produce a disconnected reader. To connect a reader, use
381 /// `CreateMpscStream()`.
382 template <size_t kCapacity>
383 class BufferedMpscReader : public MpscReader {
384  public:
BufferedMpscReader()385   BufferedMpscReader() { SetBuffer(buffer_); }
386 
387  private:
388   std::array<std::byte, kCapacity> buffer_;
389 };
390 
391 }  // namespace pw::stream
392