1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 /// @file 17 /// This file defines types related to a multi-producer, single-consumer stream. 18 /// 19 /// The single readers must be constructed in place, while writers can be moved. 20 /// A reader and writer may be connected using `CreateMpscStream()`. Additional 21 /// writers may be connected by copying a previously connected writer. 22 /// 23 /// Example: 24 /// 25 /// @code{.cpp} 26 /// ... 27 /// MpscReader reader; 28 /// MpscWriter writer; 29 /// CreateMpscStream(reader, writer); 30 /// pw::Thread t(MakeThreadOptions(), [&writer] { 31 /// Status status = writer.Write(GenerateSomeData()); 32 /// ... 33 /// }); 34 /// std::byte buffer[kBufSize]; 35 /// if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) { 36 /// ProcessSomeData(buffer); 37 /// } 38 /// @endcode 39 /// 40 /// See the `MpscReader::ReadAll()` for additional examples. 41 /// 42 /// The types in the files are designed to be used across different threads, 43 /// but are not completely thread-safe. Data must only be written by an 44 /// MpscWriter using a single thread, and data must only be read by an 45 /// MpscReader using a single thread. In other words, multiple calls to 46 /// `Write()` must not be made concurrently, and multiple calls to `Read()` and 47 /// `ReadAll()` must not be made concurrently. Calls to other methods, e.g. 48 /// `Close()`, are thread-safe and may be made from any thread. 49 50 #include <cstddef> 51 52 #include "pw_bytes/span.h" 53 #include "pw_chrono/system_clock.h" 54 #include "pw_containers/intrusive_list.h" 55 #include "pw_function/function.h" 56 #include "pw_status/status.h" 57 #include "pw_status/status_with_size.h" 58 #include "pw_stream/stream.h" 59 #include "pw_sync/lock_annotations.h" 60 #include "pw_sync/mutex.h" 61 #include "pw_sync/timed_thread_notification.h" 62 63 namespace pw::stream { 64 65 // Forward declaration. 66 class MpscReader; 67 class MpscWriter; 68 69 /// Creates a multi-producer, single consumer stream. 70 /// 71 /// This method creates a stream by associating a reader and writer. Both are 72 /// reset before being connected. This is the only way to connect a reader. 73 /// Additional writers may be connected by copying the given writer after it is 74 /// connected. 75 /// 76 /// This method is thread-safe with respect to other MpscReader and MpscWriter 77 /// methods. It is not thread-safe with respect to itself, i.e. callers must 78 /// not make concurrent calls to `CreateMpscStream()` from different threads 79 /// with the same objects. 80 /// 81 /// @param[out] reader The reader to connect. 82 /// @param[out] writer The writer to connect. 83 void CreateMpscStream(MpscReader& reader, MpscWriter& writer); 84 85 /// Writer for a multi-producer, single consumer stream. 86 /// 87 /// This class has a default constructor that only produces disconnected 88 /// writers. To connect writers, use `CreateMpscStream()`. Additional connected 89 /// writers can be created by copying an existing one. 90 /// 91 /// Each thread should have its own dedicated writer. This class is thread-safe 92 /// with respect to the reader, but not with respect to itself. In particular, 93 /// attempting to call `Write()` concurrently on different threads may cause 94 /// result in a failure. 95 class MpscWriter : public NonSeekableWriter, 96 public IntrusiveList<MpscWriter>::Item { 97 public: 98 using duration = std::optional<chrono::SystemClock::duration>; 99 100 /// A per-writer thread notification that can be added to a reader's list. 101 /// 102 /// The reader maintains a list of outstanding requests to write data. As 103 /// data is read, and space to write data becomes available, it uses these 104 /// requests to signal the waiting the writers. 105 struct Request : public IntrusiveList<Request>::Item { 106 sync::TimedThreadNotification notification; 107 using IntrusiveList<Request>::Item::unlisted; 108 }; 109 110 MpscWriter() = default; 111 MpscWriter(const MpscWriter& other); 112 MpscWriter& operator=(const MpscWriter& other); 113 MpscWriter(MpscWriter&& other); 114 MpscWriter& operator=(MpscWriter&& other); 115 ~MpscWriter() override; 116 117 /// Returns whether this object is connected to a reader. 118 bool connected() const PW_LOCKS_EXCLUDED(mutex_); 119 120 /// Indicates how much data was sent in the last call to `Write()`. 121 size_t last_write() const PW_LOCKS_EXCLUDED(mutex_); 122 123 /// Returns the optional maximum time elapsed before a `Write()` fails. 124 const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_); 125 126 /// Set the timeout for writing to this stream. 127 /// 128 /// After setting a timeout, if the given duration elapses while making a call 129 /// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired, 130 /// a timeout should be set before calling `Write()`. Setting a timeout when a 131 /// writer is awaiting notification from a reader will not affect the duration 132 /// of that wait. 133 /// 134 /// Note that setting a write timeout makes partial writes possible. For 135 /// example, if a call to `Write()` of some length corresponds to 2 calls to 136 /// `Read()` of half that length with an sufficient delay between the calls 137 /// will result in the first half being written and read, but not the second. 138 /// This differs from `Stream::Write()` which stipulates that no data is 139 /// written on failure. If this happens, the length of the data written can be 140 /// retrieved using `last_write()`. 141 /// 142 /// Generally, callers should use one of three approaches: 143 /// 1. Do not set a write timeout, and let writers block arbitrarily long 144 /// until space is available or the reader is disconnected. 145 /// 2. Use only a single writer, and use `last_write()` to resend data. 146 /// 3. Structure the data being sent so that the reader can always read 147 /// complete messages and avoid blocking or performing complex work 148 /// mid-message. 149 /// 150 /// @param[in] timeout The duration to wait before returning an error. 151 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_); 152 153 /// Sets the maximum amount that can be written by this writer. 154 /// 155 /// By default, writers can write an unlimited amount of data. This method can 156 /// be used to set a limit, or remove it by providing a value of 157 /// Stream::kUnlimited. 158 /// 159 /// If a limit is set, the writer will automatically close once it has written 160 /// that much data. The current number of bytes remaining until the limit is 161 /// reached can be retrieved using `ConservativeWriteLimit()`. 162 /// 163 /// @param[in] limit The maximum amount that can be written by this writer. 164 void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_); 165 166 /// Disconnects this writer from its reader. 167 /// 168 /// This method does nothing if the writer is not connected. 169 void Close() PW_LOCKS_EXCLUDED(mutex_); 170 171 private: 172 // The factory method is allowed to directly modify a writer to connect it 173 // to the reader. 174 friend void CreateMpscStream(MpscReader&, MpscWriter&); 175 176 /// @copydoc Stream::ConservativeLimit 177 size_t ConservativeLimit(LimitType type) const override; 178 179 /// @copydoc Stream::DoWrite 180 /// 181 /// This method is *not* thread-safe with respect to itself. If multiple 182 /// threads attempt to write concurrently using the same writer, those calls 183 /// may fail. Instead, each thread should have its own writer. 184 /// 185 /// @pre No other thread has called `Write()` on this object. 186 Status DoWrite(ConstByteSpan data) override; 187 188 /// Locked implementation of `Close()`. 189 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 190 191 mutable sync::Mutex mutex_; 192 MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr; 193 size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited; 194 Request write_request_; 195 duration timeout_ PW_GUARDED_BY(mutex_); 196 size_t last_write_ PW_GUARDED_BY(mutex_) = 0; 197 }; 198 199 /// Reader of a multi-producer, single-consumer stream. 200 /// 201 /// The reader manages 3 aspects of the stream: 202 /// * The storage used to hold written data that is to be read. 203 /// * The list of connected writers. 204 /// * Accounting for how much data has and can be written. 205 /// 206 /// This class has a default constructor that can only produce a disconnected 207 /// reader. To connect a reader, use `CreateMpscStream()`. 208 class MpscReader : public NonSeekableReader { 209 public: 210 using duration = std::optional<chrono::SystemClock::duration>; 211 212 MpscReader(); 213 ~MpscReader() override; 214 215 /// Returns whether this object has any connected writers. 216 bool connected() const PW_LOCKS_EXCLUDED(mutex_); 217 218 /// Set the timeout for reading from this stream. 219 /// 220 /// After setting a timeout, if the given duration elapses while making a call 221 /// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout 222 /// should be set before calling `Read()` or `ReadAll()`. Setting a timeout 223 /// when a reader is awaiting notification from a writer will not affect the 224 /// duration of that wait. `ReadUntilClose()` ignores timeouts entirely. 225 /// 226 /// @param[in] timeout The duration to wait before returning an error. 227 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_); 228 229 /// Associates the reader with storage to buffer written data to be read. 230 /// 231 /// If desired, callers can use this method to buffer written data. This can 232 /// improve writer performance by allowing calls to `WriteData()` to avoid 233 /// waiting for the reader, albeit at the cost of increased memory. This can 234 /// be useful when the reader needs time to process the data it reads, or when 235 /// the volume of writes varies over time, i.e. is "bursty". 236 /// 237 /// The reader does not take ownership of the storage, which must be valid 238 /// until a call to the destructor or another call to `SetBuffer()`. 239 /// 240 /// @param[in] buffer A view to the storage. 241 void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_); 242 243 /// @fn ReadAll 244 /// Reads data in a loop and passes it to a provided callback. 245 /// 246 /// This will read continuously until all connected writers close. 247 /// 248 /// Example usage: 249 /// 250 /// @code(.cpp} 251 /// MpscReader reader; 252 /// MpscWriter writer; 253 /// MpscStreamCreate(reader, writer); 254 /// Thread t(MakeThreadOptions(), [] (void*arg) { 255 /// auto *writer = static_cast<MpscWriter *>(arg); 256 /// writer->Write(GenerateSomeData()).IgnoreError(); 257 /// }, &writer); 258 /// auto status = reader.ReadAll([] (ConstByteSpan data) { 259 /// return ProcessSomeData(); 260 /// }); 261 /// t.join(); 262 /// @endcode 263 /// 264 /// @param[in] callback A callable object to invoke on data as it is read. 265 /// @retval OK Successfully read until writers closed. 266 /// @retval FAILED_PRECONDITION The object does not have a buffer. 267 /// @retval RESOURCE_EXHAUSTED Timed out when reading data. This can only 268 /// occur if a timeout has been set. 269 /// @retval Any other error as returned by the callback. 270 using ReadAllCallback = Function<Status(ConstByteSpan data)>; 271 Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_); 272 273 /// Disconnects all writers and drops any unread data. 274 void Close() PW_LOCKS_EXCLUDED(mutex_); 275 276 private: 277 // The factory method is allowed to directly modify the reader to connect it 278 // to a writer. 279 friend void CreateMpscStream(MpscReader&, MpscWriter&); 280 281 // The writer is allowed to call directly into the reader to: 282 // * Add/remove itself to the reader's list of writer. 283 // * Request space to write data, and to write to that space. 284 friend class MpscWriter; 285 286 /// @fn IncreaseLimit 287 /// @fn IncreaseLimitLocked 288 /// Increases the number of remaining bytes to be written. 289 /// 290 /// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`. 291 /// 292 /// @param[in] delta How much to increase the number of remaining bytes. 293 void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_); 294 void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 295 296 /// @fn DecreaseLimit 297 /// @fn DecreaseLimitLocked 298 /// Decreases the number of remaining bytes to be written. 299 /// 300 /// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`. 301 /// 302 /// @param[in] delta How much to decrease the number of remaining bytes. 303 void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_); 304 void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 305 306 /// @copydoc Stream::ConservativeLimit 307 size_t ConservativeLimit(Stream::LimitType type) const override 308 PW_LOCKS_EXCLUDED(mutex_); 309 310 /// Adds the write request to the reader's list of pending requests. 311 /// 312 /// Used by `MpscWriter::WriteData()`. 313 /// 314 /// @param[in] write_request A writer's request object. 315 void RequestWrite(MpscWriter::Request& write_request) 316 PW_LOCKS_EXCLUDED(mutex_); 317 318 /// Checks if a writer can write data, and signals it if so. 319 /// 320 /// A reader may signal a writer because: 321 /// * Space to write data has become available. 322 /// * The queue of write requests has changed. 323 /// * The reader is closing. `WriteData()` will return OUT_OF_RANGE. 324 void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 325 326 /// Adds data from a writer to the buffer to be read. 327 /// 328 /// @param[in] data The data to be written. 329 /// @param[in] limit The writer's current write limit. 330 /// 331 /// @retval OK Data was written to the buffer. 332 /// @retval RESOURCE_EXHAUSTED Buffer has insufficent space for data. 333 /// @retval OUT_OF_RANGE Stream is shut down or closed. 334 StatusWithSize WriteData(ConstByteSpan data, size_t limit) 335 PW_LOCKS_EXCLUDED(mutex_); 336 337 /// @fn CompleteWrite 338 /// @fn CompleteWriteLocked 339 /// Removes the write request from the reader's list of pending requests. 340 /// 341 /// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`. 342 /// 343 /// @param[in] write_request A writer's request object. 344 void CompleteWrite(MpscWriter::Request& write_request) 345 PW_LOCKS_EXCLUDED(mutex_); 346 void CompleteWriteLocked(MpscWriter::Request& write_request) 347 PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 348 349 /// @copydoc Stream::DoRead 350 StatusWithSize DoRead(ByteSpan destination) override 351 PW_LOCKS_EXCLUDED(mutex_); 352 353 // Locked implementations. 354 355 mutable sync::Mutex mutex_; 356 IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_); 357 IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_); 358 IntrusiveList<MpscWriter::Request>::iterator last_request_ 359 PW_GUARDED_BY(mutex_); 360 361 size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0; 362 size_t limit_ PW_GUARDED_BY(mutex_) = 0; 363 364 bool reading_ PW_GUARDED_BY(mutex_) = false; 365 sync::TimedThreadNotification readable_; 366 sync::ThreadNotification closeable_; 367 duration timeout_ PW_GUARDED_BY(mutex_); 368 369 ByteSpan destination_ PW_GUARDED_BY(mutex_); 370 size_t written_ PW_GUARDED_BY(mutex_) = 0; 371 372 ByteSpan buffer_ PW_GUARDED_BY(mutex_); 373 size_t offset_ PW_GUARDED_BY(mutex_) = 0; 374 size_t length_ PW_GUARDED_BY(mutex_) = 0; 375 }; 376 377 /// Reader for a multi-producer, single consumer stream. 378 /// 379 /// This class includes an explicitly-sized buffer. It has a default constructor 380 /// that can only produce a disconnected reader. To connect a reader, use 381 /// `CreateMpscStream()`. 382 template <size_t kCapacity> 383 class BufferedMpscReader : public MpscReader { 384 public: BufferedMpscReader()385 BufferedMpscReader() { SetBuffer(buffer_); } 386 387 private: 388 std::array<std::byte, kCapacity> buffer_; 389 }; 390 391 } // namespace pw::stream 392