1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <limits> 17 #include <mutex> 18 19 #include "pw_assert/assert.h" 20 #include "pw_bytes/span.h" 21 #include "pw_function/function.h" 22 #include "pw_multisink/config.h" 23 #include "pw_result/result.h" 24 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h" 25 #include "pw_status/status.h" 26 #include "pw_sync/lock_annotations.h" 27 28 namespace pw { 29 namespace multisink { 30 31 // An asynchronous single-writer multi-reader queue that ensures readers can 32 // poll for dropped message counts, which is useful for logging or similar 33 // scenarios where readers need to be aware of the input message sequence. 34 // 35 // This class is thread-safe but NOT IRQ-safe when 36 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled. 37 class MultiSink { 38 public: 39 // An asynchronous reader which is attached to a MultiSink via AttachDrain. 40 // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away 41 // entry sequence information for clients when popping. 42 class Drain { 43 public: 44 // Holds the context for a peeked entry, tha the user may pass to `PopEntry` 45 // to advance the drain. 46 class PeekedEntry { 47 public: 48 // Provides access to the peeked entry's data. entry()49 ConstByteSpan entry() const { return entry_; } 50 51 private: 52 friend MultiSink; 53 friend MultiSink::Drain; 54 PeekedEntry(ConstByteSpan entry,uint32_t sequence_id)55 constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id) 56 : entry_(entry), sequence_id_(sequence_id) {} 57 sequence_id()58 uint32_t sequence_id() const { return sequence_id_; } 59 60 const ConstByteSpan entry_; 61 const uint32_t sequence_id_; 62 }; 63 Drain()64 constexpr Drain() 65 : last_handled_sequence_id_(0), 66 last_peek_sequence_id_(0), 67 last_handled_ingress_drop_count_(0), 68 multisink_(nullptr) {} 69 70 // Returns the next available entry if it exists and acquires the latest 71 // drop count in parallel. 72 // 73 // If the read operation was successful or returned OutOfRange (i.e. no 74 // entries to read) then the `drain_drop_count_out` is set to the number of 75 // entries that were dropped since the last call to PopEntry due to 76 // advancing the drain, and `ingress_drop_count_out` is set to the number of 77 // logs that were dropped before being added to the MultiSink. Otherwise, 78 // the drop counts are set to zero, so should always be processed. 79 // 80 // Drop counts are internally maintained with a 32-bit counter. If 81 // UINT32_MAX entries have been handled by the attached multisink between 82 // subsequent calls to PopEntry, the drop count will overflow and will 83 // report a lower count erroneously. Users should ensure that sinks call 84 // PopEntry at least once every UINT32_MAX entries. 85 // 86 // Example Usage: 87 // 88 // void ProcessEntriesFromDrain(Drain& drain) { 89 // std::array<std::byte, kEntryBufferSize> buffer; 90 // uint32_t drop_count = 0; 91 // 92 // // Example#1: Request the drain for a new entry. 93 // { 94 // const Result<ConstByteSpan> result = drain.PopEntry(buffer, 95 // drop_count); 96 // 97 // // If a non-zero drop count is received, process them. 98 // if (drop_count > 0) { 99 // ProcessDropCount(drop_count); 100 // } 101 // 102 // // If the call was successful, process the entry that was received. 103 // if (result.ok()) { 104 // ProcessEntry(result.value()); 105 // } 106 // } 107 // 108 // // Example#2: Drain out all messages. 109 // { 110 // Result<ConstByteSpan> result = Status::OutOfRange(); 111 // do { 112 // result = drain.PopEntry(buffer, drop_count); 113 // 114 // if (drop_count > 0) { 115 // ProcessDropCount(drop_count); 116 // } 117 // 118 // if (result.ok()) { 119 // ProcessEntry(result.value()); 120 // } 121 // 122 // // Keep trying until we hit OutOfRange. Note that a new entry may 123 // // have arrived after the PopEntry call. 124 // } while (!result.IsOutOfRange()); 125 // } 126 // } 127 // Precondition: the buffer data must not be corrupt, otherwise there will 128 // be a crash. 129 // 130 // Return values: 131 // OK - An entry was successfully read from the multisink. 132 // OUT_OF_RANGE - No entries were available. 133 // FAILED_PRECONDITION - The drain must be attached to a sink. 134 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 135 // the next available entry, which was discarded. 136 Result<ConstByteSpan> PopEntry(ByteSpan buffer, 137 uint32_t& drain_drop_count_out, 138 uint32_t& ingress_drop_count_out) 139 PW_LOCKS_EXCLUDED(multisink_->lock_); 140 // Overload that combines drop counts. 141 // TODO(cachinchilla): remove when downstream projects migrated to new API. 142 [[deprecated("Use PopEntry with different drop count outputs")]] Result< 143 ConstByteSpan> PopEntry(ByteSpan buffer,uint32_t & drop_count_out)144 PopEntry(ByteSpan buffer, uint32_t& drop_count_out) 145 PW_LOCKS_EXCLUDED(multisink_->lock_) { 146 uint32_t ingress_drop_count = 0; 147 Result<ConstByteSpan> result = 148 PopEntry(buffer, drop_count_out, ingress_drop_count); 149 drop_count_out += ingress_drop_count; 150 return result; 151 } 152 153 // Removes the previously peeked entry from the multisink. 154 // 155 // Example Usage: 156 // 157 // // Peek entry to send it, and remove entry from multisink on success. 158 // uint32_t drop_count; 159 // const Result<PeekedEntry> peek_result = 160 // PeekEntry(out_buffer, drop_count); 161 // if (!peek_result.ok()) { 162 // return peek_result.status(); 163 // } 164 // Status send_status = UserSendFunction(peek_result.value().entry()) 165 // if (!send_status.ok()) { 166 // return send_status; 167 // } 168 // PW_CHECK_OK(PopEntry(peek_result.value()); 169 // 170 // Precondition: the buffer data must not be corrupt, otherwise there will 171 // be a crash. 172 // 173 // Return values: 174 // OK - the entry or entries were removed from the multisink successfully. 175 // FAILED_PRECONDITION - The drain must be attached to a sink. 176 Status PopEntry(const PeekedEntry& entry) 177 PW_LOCKS_EXCLUDED(multisink_->lock_); 178 179 // Returns a copy of the next available entry if it exists and acquires the 180 // latest drop count if the drain was advanced, and the latest ingress drop 181 // count, without moving the drain forward, except if there is a 182 // RESOURCE_EXHAUSTED error when peeking, in which case the drain is 183 // automatically advanced. 184 // The `drain_drop_count_out` follows the same logic as `PopEntry`. The user 185 // must call `PopEntry` once the data in peek was used successfully. 186 // 187 // Precondition: the buffer data must not be corrupt, otherwise there will 188 // be a crash. 189 // 190 // Return values: 191 // OK - An entry was successfully read from the multisink. 192 // OUT_OF_RANGE - No entries were available. 193 // FAILED_PRECONDITION - The drain must be attached to a sink. 194 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 195 // the next available entry, which was discarded. 196 Result<PeekedEntry> PeekEntry(ByteSpan buffer, 197 uint32_t& drain_drop_count_out, 198 uint32_t& ingress_drop_count_out) 199 PW_LOCKS_EXCLUDED(multisink_->lock_); 200 201 // Drains are not copyable or movable. 202 Drain(const Drain&) = delete; 203 Drain& operator=(const Drain&) = delete; 204 Drain(Drain&&) = delete; 205 Drain& operator=(Drain&&) = delete; 206 207 // Returns size of unread entries in the sink for this drain. This is a 208 // thread-safe version and must not be used inside a 209 // Listener::OnNewEntryAvailable() to avoid deadlocks. Use 210 // UnsafeGetUnreadEntriesSize() instead in such case. GetUnreadEntriesSize()211 size_t GetUnreadEntriesSize() const PW_LOCKS_EXCLUDED(multisink_->lock_) { 212 std::lock_guard lock(multisink_->lock_); 213 return UnsafeGetUnreadEntriesSize(); 214 } 215 216 // Returns size of unread entries in the sink for this drain. 217 // Marked unsafe because it requires external synchronization. UnsafeGetUnreadEntriesSize()218 size_t UnsafeGetUnreadEntriesSize() const PW_NO_LOCK_SAFETY_ANALYSIS { 219 return reader_.EntriesSize(); 220 } 221 222 // Return number of unread entries in the sink for this drain. This is a 223 // thread-safe version and must not be used inside a 224 // Listener::OnNewEntryAvailable() to avoid deadlocks. GetUnreadEntriesCount()225 size_t GetUnreadEntriesCount() const PW_LOCKS_EXCLUDED(multisink_->lock_) { 226 std::lock_guard lock(multisink_->lock_); 227 return reader_.EntryCount(); 228 } 229 230 protected: 231 friend MultiSink; 232 233 // The `reader_` and `last_handled_sequence_id_` are managed by attached 234 // multisink and are guarded by `multisink_->lock_` when used. 235 ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_; 236 uint32_t last_handled_sequence_id_; 237 uint32_t last_peek_sequence_id_; 238 uint32_t last_handled_ingress_drop_count_; 239 MultiSink* multisink_; 240 }; 241 242 // A pure-virtual listener of a MultiSink, attached via AttachListener. 243 // MultiSink's invoke listeners when new data arrives, allowing them to 244 // schedule the draining of messages out of the MultiSink. 245 class Listener : public IntrusiveList<Listener>::Item { 246 public: Listener()247 constexpr Listener() {} 248 virtual ~Listener() = default; 249 250 // Listeners are not copyable or movable. 251 Listener(const Listener&) = delete; 252 Listener& operator=(const Drain&) = delete; 253 Listener(Listener&&) = delete; 254 Listener& operator=(Drain&&) = delete; 255 256 protected: 257 friend MultiSink; 258 259 // Invoked by the attached multisink when a new entry or drop count is 260 // available. The multisink lock is held during this call, so neither the 261 // multisink nor it's drains can be used during this callback. 262 virtual void OnNewEntryAvailable() = 0; 263 }; 264 265 class iterator { 266 public: 267 iterator& operator++() { 268 it_++; 269 return *this; 270 } 271 iterator operator++(int) { 272 iterator original = *this; 273 ++*this; 274 return original; 275 } 276 277 iterator& operator--() { 278 it_--; 279 return *this; 280 } 281 iterator operator--(int) { 282 iterator original = *this; 283 --*this; 284 return original; 285 } 286 287 ConstByteSpan& operator*() { 288 entry_ = (*it_).buffer; 289 return entry_; 290 } 291 ConstByteSpan* operator->() { return &operator*(); } 292 293 constexpr bool operator==(const iterator& rhs) const { 294 return it_ == rhs.it_; 295 } 296 297 constexpr bool operator!=(const iterator& rhs) const { 298 return it_ != rhs.it_; 299 } 300 301 // Returns the status of the last iteration operation. If the iterator 302 // fails to read an entry, it will move to iterator::end() and indicate 303 // the failure reason here. 304 // 305 // Return values: 306 // OK - iteration is successful and iterator points to the next entry. 307 // DATA_LOSS - Failed to read the metadata at this location. status()308 Status status() const { return it_.status(); } 309 310 private: 311 friend class MultiSink; 312 iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)313 iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader) 314 : it_(reader) {} iterator()315 iterator() {} 316 317 ring_buffer::PrefixedEntryRingBufferMulti::iterator it_; 318 ConstByteSpan entry_; 319 }; 320 321 class UnsafeIterationWrapper { 322 public: 323 using element_type = ConstByteSpan; 324 using value_type = std::remove_cv_t<ConstByteSpan>; 325 using pointer = ConstByteSpan*; 326 using reference = ConstByteSpan&; 327 using const_iterator = iterator; // Standard alias for iterable types. 328 begin()329 iterator begin() const { return iterator(*reader_); } end()330 iterator end() const { return iterator(); } cbegin()331 const_iterator cbegin() const { return begin(); } cend()332 const_iterator cend() const { return end(); } 333 334 private: 335 friend class MultiSink; UnsafeIterationWrapper(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)336 UnsafeIterationWrapper( 337 ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader) 338 : reader_(&reader) {} 339 ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_; 340 }; 341 UnsafeIteration()342 UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS { 343 return UnsafeIterationWrapper(oldest_entry_drain_.reader_); 344 } 345 346 // Constructs a multisink using a ring buffer backed by the provided buffer. 347 // If we're using a virtual lock, then the lock needs to be passed in. 348 #if PW_MULTISINK_CONFIG_LOCK_TYPE == PW_MULTISINK_VIRTUAL_LOCK MultiSink(ByteSpan buffer,LockType lock)349 MultiSink(ByteSpan buffer, LockType lock) 350 : lock_(lock), 351 #else 352 MultiSink(ByteSpan buffer) 353 : 354 #endif 355 ring_buffer_(true), 356 sequence_id_(0), 357 total_ingress_drops_(0) { 358 PW_ASSERT(ring_buffer_.SetBuffer(buffer).ok()); 359 AttachDrain(oldest_entry_drain_); 360 } 361 362 // Write an entry to the multisink. If available space is less than the 363 // size of the entry, the internal ring buffer will push the oldest entries 364 // out to make space, so long as the entry is not larger than the buffer. 365 // The sequence ID of the multisink will always increment as a result of 366 // calling HandleEntry, regardless of whether pushing the entry succeeds. 367 // 368 // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this 369 // function must not be called from an interrupt context. 370 // Precondition: entry.size() <= `ring_buffer_` size 371 void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_); 372 373 // Notifies the multisink of messages dropped before ingress. The writer 374 // may use this to signal to readers that an entry (or entries) failed 375 // before being sent to the multisink (e.g. the writer failed to encode 376 // the message). This API increments the sequence ID of the multisink by 377 // the provided `drop_count`. 378 void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_); 379 380 // Attach a drain to the multisink. Drains may not be associated with more 381 // than one multisink at a time. Drains can consume entries pushed before 382 // the drain was attached, so long as they have not yet been evicted from 383 // the underlying ring buffer. 384 // 385 // Precondition: The drain must not be attached to a multisink. 386 void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); 387 388 // Detaches a drain from the multisink. Drains may only be detached if they 389 // were previously attached to this multisink. 390 // 391 // Precondition: The drain must be attached to this multisink. 392 void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); 393 394 // Attach a listener to the multisink. The listener will be notified 395 // immediately when attached, to allow late drain users to consume existing 396 // entries. If draining in response to the notification, ensure that the drain 397 // is attached prior to registering the listener; attempting to drain when 398 // unattached will crash. Once attached, listeners are invoked on all new 399 // messages. 400 // 401 // Precondition: The listener must not be attached to a multisink. 402 void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_); 403 404 // Detaches a listener from the multisink. 405 // 406 // Precondition: The listener must be attached to this multisink. 407 void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_); 408 409 // Removes all data from the internal buffer. The multisink's sequence ID is 410 // not modified, so readers may interpret this event as droppping entries. 411 void Clear() PW_LOCKS_EXCLUDED(lock_); 412 413 // Uses MultiSink's unsafe iteration to dump the contents to a user-provided 414 // callback. max_num_entries can be used to limit the dump to the N most 415 // recent entries. 416 // 417 // Returns: 418 // OK - Successfully dumped entire multisink. 419 // DATA_LOSS - Corruption detected, some entries may have been lost. 420 Status UnsafeForEachEntry( 421 const Function<void(ConstByteSpan)>& callback, 422 size_t max_num_entries = std::numeric_limits<size_t>::max()); 423 424 // Uses MultiSink's unsafe iteration to dump the contents to a user-provided 425 // callback. UnsafeForEachEntryFromEnd dumps the latest entries, up to the 426 // aggregate element size of max_size_bytes. 427 // 428 // Returns: 429 // OK - Successfully dumped entire multisink. 430 // DATA_LOSS - Corruption detected, some entries may have been lost. 431 Status UnsafeForEachEntryFromEnd( 432 const Function<void(ConstByteSpan)>& callback, size_t max_size_bytes); 433 434 protected: 435 friend Drain; 436 437 enum class Request { kPop, kPeek }; 438 // Removes the previously peeked entry from the front of the multisink. 439 Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry) 440 PW_LOCKS_EXCLUDED(lock_); 441 442 // Gets a copy of the entry from the provided drain and unpacks sequence ID 443 // information. The entry is removed from the multisink when `request` is set 444 // to `Request::kPop`. Drains use this API to strip away sequence ID 445 // information for drop calculation. 446 // 447 // Precondition: the buffer data must not be corrupt, otherwise there will 448 // be a crash. 449 // 450 // Returns: 451 // OK - An entry was successfully read from the multisink. The 452 // `drain_drop_count_out` is set to the difference between the current 453 // sequence ID and the last handled ID. 454 // FAILED_PRECONDITION - The drain is not attached to 455 // a multisink. 456 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 457 // the next available entry, which was discarded. 458 Result<ConstByteSpan> PeekOrPopEntry(Drain& drain, 459 ByteSpan buffer, 460 Request request, 461 uint32_t& drain_drop_count_out, 462 uint32_t& ingress_drop_count_out, 463 uint32_t& entry_sequence_id_out) 464 PW_LOCKS_EXCLUDED(lock_); 465 466 private: 467 // Notifies attached listeners of new entries or an updated drop count. 468 void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_); 469 470 LockType lock_; 471 IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_); 472 ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_); 473 Drain oldest_entry_drain_ PW_GUARDED_BY(lock_); 474 uint32_t sequence_id_ PW_GUARDED_BY(lock_); 475 uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_); 476 }; 477 478 } // namespace multisink 479 } // namespace pw 480