xref: /aosp_15_r20/external/pigweed/pw_multisink/public/pw_multisink/multisink.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <limits>
17 #include <mutex>
18 
19 #include "pw_assert/assert.h"
20 #include "pw_bytes/span.h"
21 #include "pw_function/function.h"
22 #include "pw_multisink/config.h"
23 #include "pw_result/result.h"
24 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
25 #include "pw_status/status.h"
26 #include "pw_sync/lock_annotations.h"
27 
28 namespace pw {
29 namespace multisink {
30 
31 // An asynchronous single-writer multi-reader queue that ensures readers can
32 // poll for dropped message counts, which is useful for logging or similar
33 // scenarios where readers need to be aware of the input message sequence.
34 //
35 // This class is thread-safe but NOT IRQ-safe when
36 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
37 class MultiSink {
38  public:
39   // An asynchronous reader which is attached to a MultiSink via AttachDrain.
40   // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
41   // entry sequence information for clients when popping.
42   class Drain {
43    public:
44     // Holds the context for a peeked entry, tha the user may pass to `PopEntry`
45     // to advance the drain.
46     class PeekedEntry {
47      public:
48       // Provides access to the peeked entry's data.
entry()49       ConstByteSpan entry() const { return entry_; }
50 
51      private:
52       friend MultiSink;
53       friend MultiSink::Drain;
54 
PeekedEntry(ConstByteSpan entry,uint32_t sequence_id)55       constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id)
56           : entry_(entry), sequence_id_(sequence_id) {}
57 
sequence_id()58       uint32_t sequence_id() const { return sequence_id_; }
59 
60       const ConstByteSpan entry_;
61       const uint32_t sequence_id_;
62     };
63 
Drain()64     constexpr Drain()
65         : last_handled_sequence_id_(0),
66           last_peek_sequence_id_(0),
67           last_handled_ingress_drop_count_(0),
68           multisink_(nullptr) {}
69 
70     // Returns the next available entry if it exists and acquires the latest
71     // drop count in parallel.
72     //
73     // If the read operation was successful or returned OutOfRange (i.e. no
74     // entries to read) then the `drain_drop_count_out` is set to the number of
75     // entries that were dropped since the last call to PopEntry due to
76     // advancing the drain, and `ingress_drop_count_out` is set to the number of
77     // logs that were dropped before being added to the MultiSink. Otherwise,
78     // the drop counts are set to zero, so should always be processed.
79     //
80     // Drop counts are internally maintained with a 32-bit counter. If
81     // UINT32_MAX entries have been handled by the attached multisink between
82     // subsequent calls to PopEntry, the drop count will overflow and will
83     // report a lower count erroneously. Users should ensure that sinks call
84     // PopEntry at least once every UINT32_MAX entries.
85     //
86     // Example Usage:
87     //
88     // void ProcessEntriesFromDrain(Drain& drain) {
89     //   std::array<std::byte, kEntryBufferSize> buffer;
90     //   uint32_t drop_count = 0;
91     //
92     //   // Example#1: Request the drain for a new entry.
93     //   {
94     //     const Result<ConstByteSpan> result = drain.PopEntry(buffer,
95     //                                                         drop_count);
96     //
97     //     // If a non-zero drop count is received, process them.
98     //     if (drop_count > 0) {
99     //       ProcessDropCount(drop_count);
100     //     }
101     //
102     //     // If the call was successful, process the entry that was received.
103     //     if (result.ok()) {
104     //       ProcessEntry(result.value());
105     //     }
106     //   }
107     //
108     //   // Example#2: Drain out all messages.
109     //   {
110     //     Result<ConstByteSpan> result = Status::OutOfRange();
111     //     do {
112     //       result = drain.PopEntry(buffer, drop_count);
113     //
114     //       if (drop_count > 0) {
115     //         ProcessDropCount(drop_count);
116     //       }
117     //
118     //       if (result.ok()) {
119     //         ProcessEntry(result.value());
120     //       }
121     //
122     //       // Keep trying until we hit OutOfRange. Note that a new entry may
123     //       // have arrived after the PopEntry call.
124     //     } while (!result.IsOutOfRange());
125     //   }
126     // }
127     // Precondition: the buffer data must not be corrupt, otherwise there will
128     // be a crash.
129     //
130     // Return values:
131     // OK - An entry was successfully read from the multisink.
132     // OUT_OF_RANGE - No entries were available.
133     // FAILED_PRECONDITION - The drain must be attached to a sink.
134     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
135     // the next available entry, which was discarded.
136     Result<ConstByteSpan> PopEntry(ByteSpan buffer,
137                                    uint32_t& drain_drop_count_out,
138                                    uint32_t& ingress_drop_count_out)
139         PW_LOCKS_EXCLUDED(multisink_->lock_);
140     // Overload that combines drop counts.
141     // TODO(cachinchilla): remove when downstream projects migrated to new API.
142     [[deprecated("Use PopEntry with different drop count outputs")]] Result<
143         ConstByteSpan>
PopEntry(ByteSpan buffer,uint32_t & drop_count_out)144     PopEntry(ByteSpan buffer, uint32_t& drop_count_out)
145         PW_LOCKS_EXCLUDED(multisink_->lock_) {
146       uint32_t ingress_drop_count = 0;
147       Result<ConstByteSpan> result =
148           PopEntry(buffer, drop_count_out, ingress_drop_count);
149       drop_count_out += ingress_drop_count;
150       return result;
151     }
152 
153     // Removes the previously peeked entry from the multisink.
154     //
155     // Example Usage:
156     //
157     //  // Peek entry to send it, and remove entry from multisink on success.
158     //  uint32_t drop_count;
159     //  const Result<PeekedEntry> peek_result =
160     //      PeekEntry(out_buffer, drop_count);
161     //  if (!peek_result.ok()) {
162     //    return peek_result.status();
163     //  }
164     //  Status send_status = UserSendFunction(peek_result.value().entry())
165     //  if (!send_status.ok()) {
166     //    return send_status;
167     //  }
168     //  PW_CHECK_OK(PopEntry(peek_result.value());
169     //
170     // Precondition: the buffer data must not be corrupt, otherwise there will
171     // be a crash.
172     //
173     // Return values:
174     // OK - the entry or entries were removed from the multisink successfully.
175     // FAILED_PRECONDITION - The drain must be attached to a sink.
176     Status PopEntry(const PeekedEntry& entry)
177         PW_LOCKS_EXCLUDED(multisink_->lock_);
178 
179     // Returns a copy of the next available entry if it exists and acquires the
180     // latest drop count if the drain was advanced, and the latest ingress drop
181     // count, without moving the drain forward, except if there is a
182     // RESOURCE_EXHAUSTED error when peeking, in which case the drain is
183     // automatically advanced.
184     // The `drain_drop_count_out` follows the same logic as `PopEntry`. The user
185     // must call `PopEntry` once the data in peek was used successfully.
186     //
187     // Precondition: the buffer data must not be corrupt, otherwise there will
188     // be a crash.
189     //
190     // Return values:
191     // OK - An entry was successfully read from the multisink.
192     // OUT_OF_RANGE - No entries were available.
193     // FAILED_PRECONDITION - The drain must be attached to a sink.
194     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
195     // the next available entry, which was discarded.
196     Result<PeekedEntry> PeekEntry(ByteSpan buffer,
197                                   uint32_t& drain_drop_count_out,
198                                   uint32_t& ingress_drop_count_out)
199         PW_LOCKS_EXCLUDED(multisink_->lock_);
200 
201     // Drains are not copyable or movable.
202     Drain(const Drain&) = delete;
203     Drain& operator=(const Drain&) = delete;
204     Drain(Drain&&) = delete;
205     Drain& operator=(Drain&&) = delete;
206 
207     // Returns size of unread entries in the sink for this drain. This is a
208     // thread-safe version and must not be used inside a
209     // Listener::OnNewEntryAvailable() to avoid deadlocks. Use
210     // UnsafeGetUnreadEntriesSize() instead in such case.
GetUnreadEntriesSize()211     size_t GetUnreadEntriesSize() const PW_LOCKS_EXCLUDED(multisink_->lock_) {
212       std::lock_guard lock(multisink_->lock_);
213       return UnsafeGetUnreadEntriesSize();
214     }
215 
216     // Returns size of unread entries in the sink for this drain.
217     // Marked unsafe because it requires external synchronization.
UnsafeGetUnreadEntriesSize()218     size_t UnsafeGetUnreadEntriesSize() const PW_NO_LOCK_SAFETY_ANALYSIS {
219       return reader_.EntriesSize();
220     }
221 
222     // Return number of unread entries in the sink for this drain. This is a
223     // thread-safe version and must not be used inside a
224     // Listener::OnNewEntryAvailable() to avoid deadlocks.
GetUnreadEntriesCount()225     size_t GetUnreadEntriesCount() const PW_LOCKS_EXCLUDED(multisink_->lock_) {
226       std::lock_guard lock(multisink_->lock_);
227       return reader_.EntryCount();
228     }
229 
230    protected:
231     friend MultiSink;
232 
233     // The `reader_` and `last_handled_sequence_id_` are managed by attached
234     // multisink and are guarded by `multisink_->lock_` when used.
235     ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
236     uint32_t last_handled_sequence_id_;
237     uint32_t last_peek_sequence_id_;
238     uint32_t last_handled_ingress_drop_count_;
239     MultiSink* multisink_;
240   };
241 
242   // A pure-virtual listener of a MultiSink, attached via AttachListener.
243   // MultiSink's invoke listeners when new data arrives, allowing them to
244   // schedule the draining of messages out of the MultiSink.
245   class Listener : public IntrusiveList<Listener>::Item {
246    public:
Listener()247     constexpr Listener() {}
248     virtual ~Listener() = default;
249 
250     // Listeners are not copyable or movable.
251     Listener(const Listener&) = delete;
252     Listener& operator=(const Drain&) = delete;
253     Listener(Listener&&) = delete;
254     Listener& operator=(Drain&&) = delete;
255 
256    protected:
257     friend MultiSink;
258 
259     // Invoked by the attached multisink when a new entry or drop count is
260     // available. The multisink lock is held during this call, so neither the
261     // multisink nor it's drains can be used during this callback.
262     virtual void OnNewEntryAvailable() = 0;
263   };
264 
265   class iterator {
266    public:
267     iterator& operator++() {
268       it_++;
269       return *this;
270     }
271     iterator operator++(int) {
272       iterator original = *this;
273       ++*this;
274       return original;
275     }
276 
277     iterator& operator--() {
278       it_--;
279       return *this;
280     }
281     iterator operator--(int) {
282       iterator original = *this;
283       --*this;
284       return original;
285     }
286 
287     ConstByteSpan& operator*() {
288       entry_ = (*it_).buffer;
289       return entry_;
290     }
291     ConstByteSpan* operator->() { return &operator*(); }
292 
293     constexpr bool operator==(const iterator& rhs) const {
294       return it_ == rhs.it_;
295     }
296 
297     constexpr bool operator!=(const iterator& rhs) const {
298       return it_ != rhs.it_;
299     }
300 
301     // Returns the status of the last iteration operation. If the iterator
302     // fails to read an entry, it will move to iterator::end() and indicate
303     // the failure reason here.
304     //
305     // Return values:
306     // OK - iteration is successful and iterator points to the next entry.
307     // DATA_LOSS - Failed to read the metadata at this location.
status()308     Status status() const { return it_.status(); }
309 
310    private:
311     friend class MultiSink;
312 
iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)313     iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
314         : it_(reader) {}
iterator()315     iterator() {}
316 
317     ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
318     ConstByteSpan entry_;
319   };
320 
321   class UnsafeIterationWrapper {
322    public:
323     using element_type = ConstByteSpan;
324     using value_type = std::remove_cv_t<ConstByteSpan>;
325     using pointer = ConstByteSpan*;
326     using reference = ConstByteSpan&;
327     using const_iterator = iterator;  // Standard alias for iterable types.
328 
begin()329     iterator begin() const { return iterator(*reader_); }
end()330     iterator end() const { return iterator(); }
cbegin()331     const_iterator cbegin() const { return begin(); }
cend()332     const_iterator cend() const { return end(); }
333 
334    private:
335     friend class MultiSink;
UnsafeIterationWrapper(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)336     UnsafeIterationWrapper(
337         ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
338         : reader_(&reader) {}
339     ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
340   };
341 
UnsafeIteration()342   UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
343     return UnsafeIterationWrapper(oldest_entry_drain_.reader_);
344   }
345 
346   // Constructs a multisink using a ring buffer backed by the provided buffer.
347   // If we're using a virtual lock, then the lock needs to be passed in.
348 #if PW_MULTISINK_CONFIG_LOCK_TYPE == PW_MULTISINK_VIRTUAL_LOCK
MultiSink(ByteSpan buffer,LockType lock)349   MultiSink(ByteSpan buffer, LockType lock)
350       : lock_(lock),
351 #else
352   MultiSink(ByteSpan buffer)
353       :
354 #endif
355         ring_buffer_(true),
356         sequence_id_(0),
357         total_ingress_drops_(0) {
358     PW_ASSERT(ring_buffer_.SetBuffer(buffer).ok());
359     AttachDrain(oldest_entry_drain_);
360   }
361 
362   // Write an entry to the multisink. If available space is less than the
363   // size of the entry, the internal ring buffer will push the oldest entries
364   // out to make space, so long as the entry is not larger than the buffer.
365   // The sequence ID of the multisink will always increment as a result of
366   // calling HandleEntry, regardless of whether pushing the entry succeeds.
367   //
368   // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
369   // function must not be called from an interrupt context.
370   // Precondition: entry.size() <= `ring_buffer_` size
371   void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);
372 
373   // Notifies the multisink of messages dropped before ingress. The writer
374   // may use this to signal to readers that an entry (or entries) failed
375   // before being sent to the multisink (e.g. the writer failed to encode
376   // the message). This API increments the sequence ID of the multisink by
377   // the provided `drop_count`.
378   void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);
379 
380   // Attach a drain to the multisink. Drains may not be associated with more
381   // than one multisink at a time. Drains can consume entries pushed before
382   // the drain was attached, so long as they have not yet been evicted from
383   // the underlying ring buffer.
384   //
385   // Precondition: The drain must not be attached to a multisink.
386   void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
387 
388   // Detaches a drain from the multisink. Drains may only be detached if they
389   // were previously attached to this multisink.
390   //
391   // Precondition: The drain must be attached to this multisink.
392   void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
393 
394   // Attach a listener to the multisink. The listener will be notified
395   // immediately when attached, to allow late drain users to consume existing
396   // entries. If draining in response to the notification, ensure that the drain
397   // is attached prior to registering the listener; attempting to drain when
398   // unattached will crash. Once attached, listeners are invoked on all new
399   // messages.
400   //
401   // Precondition: The listener must not be attached to a multisink.
402   void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
403 
404   // Detaches a listener from the multisink.
405   //
406   // Precondition: The listener must be attached to this multisink.
407   void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
408 
409   // Removes all data from the internal buffer. The multisink's sequence ID is
410   // not modified, so readers may interpret this event as droppping entries.
411   void Clear() PW_LOCKS_EXCLUDED(lock_);
412 
413   // Uses MultiSink's unsafe iteration to dump the contents to a user-provided
414   // callback. max_num_entries can be used to limit the dump to the N most
415   // recent entries.
416   //
417   // Returns:
418   //   OK - Successfully dumped entire multisink.
419   //   DATA_LOSS - Corruption detected, some entries may have been lost.
420   Status UnsafeForEachEntry(
421       const Function<void(ConstByteSpan)>& callback,
422       size_t max_num_entries = std::numeric_limits<size_t>::max());
423 
424   // Uses MultiSink's unsafe iteration to dump the contents to a user-provided
425   // callback. UnsafeForEachEntryFromEnd dumps the latest entries, up to the
426   // aggregate element size of max_size_bytes.
427   //
428   // Returns:
429   //   OK - Successfully dumped entire multisink.
430   //   DATA_LOSS - Corruption detected, some entries may have been lost.
431   Status UnsafeForEachEntryFromEnd(
432       const Function<void(ConstByteSpan)>& callback, size_t max_size_bytes);
433 
434  protected:
435   friend Drain;
436 
437   enum class Request { kPop, kPeek };
438   // Removes the previously peeked entry from the front of the multisink.
439   Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry)
440       PW_LOCKS_EXCLUDED(lock_);
441 
442   // Gets a copy of the entry from the provided drain and unpacks sequence ID
443   // information. The entry is removed from the multisink when `request` is set
444   // to `Request::kPop`. Drains use this API to strip away sequence ID
445   // information for drop calculation.
446   //
447   // Precondition: the buffer data must not be corrupt, otherwise there will
448   // be a crash.
449   //
450   // Returns:
451   // OK - An entry was successfully read from the multisink. The
452   // `drain_drop_count_out` is set to the difference between the current
453   // sequence ID and the last handled ID.
454   // FAILED_PRECONDITION - The drain is not attached to
455   // a multisink.
456   // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
457   // the next available entry, which was discarded.
458   Result<ConstByteSpan> PeekOrPopEntry(Drain& drain,
459                                        ByteSpan buffer,
460                                        Request request,
461                                        uint32_t& drain_drop_count_out,
462                                        uint32_t& ingress_drop_count_out,
463                                        uint32_t& entry_sequence_id_out)
464       PW_LOCKS_EXCLUDED(lock_);
465 
466  private:
467   // Notifies attached listeners of new entries or an updated drop count.
468   void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
469 
470   LockType lock_;
471   IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
472   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
473   Drain oldest_entry_drain_ PW_GUARDED_BY(lock_);
474   uint32_t sequence_id_ PW_GUARDED_BY(lock_);
475   uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_);
476 };
477 
478 }  // namespace multisink
479 }  // namespace pw
480