1.. _seed-0124: 2 3=============================================================== 40124: Interfaces for Retrieving Size Information from Multisink 5=============================================================== 6.. seed:: 7 :number: 124 8 :name: Interfaces for Retrieving Size Information from Multisink 9 :status: open_for_comments 10 :proposal_date: 2024-01-22 11 :cl: 188670 12 :authors: Jiacheng Lu 13 :facilitator: Carlos Chinchilla 14 15------- 16Summary 17------- 18This SEED proposes adding interfaces to :ref:`module-pw_multisink` to retrieve 19the capacity and filled size from its underlying buffer. 20 21---------- 22Motivation 23---------- 24Currently, ``pw_multisink`` provides ``Listener`` to help schedule draining of 25entries. Interfaces of ``pw_multisink`` and its ``Listener`` provides no 26information about capacity or consumed size of the underlying buffer. 27 28By adding interfaces to ``pw_multisink`` to provide size information of its 29underlying buffer, scheduling policies can be implemented to trigger draining 30based on, for example, the size of unread data in the buffer. 31 32-------- 33Proposal 34-------- 35 36Add new interface ``UnsafeGetUnreadEntriesSize()`` to 37``pw_multisink::MultiSink::Drain`` that reports size of unread data in the 38underlying buffer. 39 40 .. code-block:: cpp 41 42 class MutilSink { 43 public: 44 ... 45 46 class Drain { 47 ... 48 49 // Both two APIs beturn size in bytes of the valid data in the 50 // underlying buffer that has not been read by this drain. 51 52 // This is a thread unsafe version. It requires the `lock` of 53 // `multisink` being held. For example, it can be used inside 54 // Listeners' methods, where lock already held, to check size 55 // on handling each new entry. 56 size_t UnsafeGetUnreadEntriesSize() PW_NO_LOCK_SAFETY_ANALYSIS; 57 58 // A thread-safe verson. 59 size_t GetUnreadEntriesSize() PW_LOCKS_EXCLUDED(multisink_->lock_); 60 61 ... 62 63 protected: 64 friend class MultiSink; 65 66 MultiSink* multisink_; 67 }; 68 69 ... 70 71 private: 72 LockType lock_; 73 }; 74 75 76--------------------- 77Problem investigation 78--------------------- 79``pw_multisink`` is a popular choice to buffer data from logs for Pigweed based 80softwares. An example is :ref:`module-pw_log_rpc` where an instance of 81``pw_multisink`` is used to buffer logs before they are drained and transmitted. 82Make sure ``pw_multisink`` are drained in time is important to reduce the 83chances of dropping logs because of running out of space. 84However, the draining and transmission behavior may be constrained by the 85property of underlying physical channels. For certain physical channels, there 86are tradeoffs between frequency of transmission and costs. 87 88PCI is one of the example of physical channels that have the above mentioned 89tradeoffs. PCI implementation normally have different levels of power states for 90power efficiency. Transmitting data over PCI continuously may result to it not 91being able to enter deep sleep states. Also, PCI are normally have a high 92transmission bandwidth. Buffering data to a certain threshold and then 93sending them all over in a single transmission fits better with its design. 94 95The ``OnNewEntryAvailable`` on the current ``Listener`` interfaces does not 96provide enough information about buffered data size because the current 97implementation of underlying buffer is :ref:`module-pw_ring_buffer`, it stores 98additional, variable lenghted data for its internal for each entry. 99 100 101Assuming the proposed interface is avaible, the imagined use case looks like: 102 103 .. code-block:: cpp 104 105 class DrainThread: public pw::thread::ThreadCore, 106 public pw::multisink::MultiSink::Listener { 107 public: 108 109 ... // initialize with drain 110 111 bool NeedFlush(size_t unread_entries_size) { 112 ... 113 } 114 115 void Flush(pw::multisink::MultiSink::Drain& drain) { 116 ... 117 } 118 119 void OnNewEntryAvailable() override { 120 if (NeedFlush(drain_.UnsafeGetUnreadEntriesSize())) { 121 flush_threshold_reached_notification_.release(); 122 } 123 } 124 125 void Run() override { 126 multisink_.AttachListner(*this); 127 128 while (true) { 129 flush_threshold_reached_notification_.acquire(); 130 Flush(drain_); 131 } 132 } 133 134 135 private: 136 pw::multisink::MultiSink& multisink_; 137 pw::multisink::MultiSink::Drain& drain_; 138 pw::ThreadNotification flush_threshold_reached_notification_; 139 }; 140 141 142--------------- 143Detailed design 144--------------- 145 146Implement ``EntriesSize()`` in 147``pw_ring_buffer::PrefixedEntryRingBufferMulti::Reader`` to provide the number 148of bytes between its reader pointer and ring buffer's writer pointer. 149 150 .. code-block:: cpp 151 152 class PrefixedEntryRingBufferMulti { 153 class Reader : public IntrusiveList<Reader>::Item { 154 public: 155 156 // Get the size of the unread entries currently in the ring buffer. 157 // Return value: 158 // Number of bytes 159 size_t EntriesSize() const { 160 // Case: Not wrapped. 161 if (read_idx_ < buffer_->write_idx_) { 162 return buffer_->write_idx_ - read_idx_; 163 } 164 // Case: Wrapped. 165 if (read_idx_ > buffer_->write_idx_) { 166 return buffer_->buffer_bytes_ - (read_idx_ - buffer_->write_idx_); 167 } 168 169 // No entries remaining. 170 if (entry_count_ == 0) { 171 return 0; 172 } 173 174 return buffer_->buffer_bytes_; 175 } 176 177 private: 178 PrefixedEntryRingBufferMulti* buffer_; 179 size_t read_idx_; 180 }; 181 182 private: 183 size_t write_idx_; 184 size_t buffer_bytes_; 185 }; 186 187 188The unread data size of ``Drain`` is directly fetched from ring buffer's 189``Reader``. Because ``pw_multisink`` uses ``lock_`` to protect accesses to all 190listeners' methods already, in order to support calling the proposed interfaces 191from listeners, this design introduces two versions of API, one thread-safe 192version that is intended to be used outside of listeners, and one thread-unsafe 193version requires that ``lock_`` of ``pw_multisink`` being held when invoking. 194 195 .. code-block:: cpp 196 197 namespace pw { 198 namespace multisink { 199 200 class MutilSink { 201 public: 202 ... 203 204 class Drain { 205 public: 206 207 // Both two APIs beturn size in bytes of the valid data in the 208 // underlying buffer that has not been read by this drain. 209 210 // Ideally it should use annotation 211 // PW_EXCLUSIVE_LOCKS_REQUIRED(multisink_->lock_) 212 // however, Listener interfaces, where it is intended to be called, 213 // cannot be annotated using multisink's lock. Static analysis is not 214 // doable. 215 size_t UnsafeGetUnreadEntriesSize() PW_NO_LOCK_SAFETY_ANALYSIS { 216 return reader_.EntriesSize(); 217 } 218 219 size_t GetUnreadEntriesSize() PW_LOCKS_EXCLUDED(multisink_->lock_) { 220 std::lock_guard lock(multisink_->lock_); 221 return UnsafeGetUnreadEntriesSize(); 222 } 223 224 protected: 225 friend class MultiSink; 226 227 MultiSink* multisink_; 228 ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_; 229 }; 230 231 ... 232 233 private: 234 LockType lock_; 235 ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ 236 PW_GUARDED_BY(lock_); 237 }; 238 239 } // namespace multisink 240 } // namespace pw 241 242------------ 243Alternatives 244------------ 245 246Add on buffer size change interface to listener 247=============================================== 248Add ``OnBufferSizeChange`` interface to ``pw_multisink::MultiSinkListener``. The 249new interface gets invoked when the available size of the underlying buffer 250changes. 251 252Interface example: 253 254 .. code-block:: cpp 255 256 class MultiSink { 257 public: 258 class Listener { 259 public: 260 261 ... 262 263 virtual void OnNewEntryAvailable() = 0; 264 265 virtual void OnBufferSizeChange(size_t total_size, size_t used_sized); 266 }; 267 268 ... 269 } 270 271 272Imagined implementation of ``OnBufferSizeChange`` being invoked after an entry 273push or pop. It uses existing interfaces of the underlying 274:ref:`module-pw_ring_buffer`. In reality, this implementation does not work well, 275explained in the **problems** sections below. 276 277 .. code-block:: cpp 278 279 void MutilSink::HandleEntry(ConstByteSpan entry) { 280 std::lock_guard lock(lock_); 281 ... 282 ring_buffer_.PushBack(entry); 283 NotifyListenersBufferSizeChanged( 284 ring_buffer_.TotalSizeBytes(), 285 ring_buffer_.TotalUsedBytes()); 286 ... 287 } 288 289 .. code-block:: cpp 290 291 void MutilSink::PopEntry(Drain& drain, ConstByteSpan entry) { 292 std::lock_guard lock(lock_); 293 ... 294 const size_t used_size_before_pop = ring_buffer_.TotalUsedBytes(); 295 drain.reader_.PopFront(); 296 const size_t used_size_after_pop = ring_buffer_.TotalUsedBytes(); 297 if (used_size_before_pop != used_size_after_pop) { 298 NotifyListenersBufferSizeChanged( 299 ring_buffer_.TotalSizeBytes(), 300 used_size_after_pop); 301 } 302 ... 303 } 304 305 306Problem 1. Find the slowest drain 307^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 308When there are multiple drains attached to ``pw_multisink``, only the slowest 309drain(s) frees space from the underlying ``pw_ring_buffer`` when pops. 310 311``pw_multisink`` supports :ref:`module-pw_multisink-late_drain_attach` which 312attaches an internal drain that never pops. The ``TotalUsedBytes()`` reported by 313underlying ``pw_ring_buffer`` counts from the slowest drain and always reports 314the full capacity instead of the real used size. 315 316 317Problem 2. Push a entry when buffer is full may decrease used size 318^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 319When the pushing of a new entry exceeds the remaining free space of the 320underlying buffer, the push can still succeed, by silent dropping entries from 321the slowest drain. However, depending on the size of dropped entries and the 322size of the new entry, the used buffer size may increase, decrease or stay the 323same. 324 325If the user of the proposed ``OnBufferSizeChange`` interface is comparing the 326reported used bytes with a threshold value, it is likely that the moment of 327underlying buffer being full may not be catched. 328 329Although it is possible to also trigger ``OnBufferSizeChange`` with 330``used_size == total_size`` when the above situation happens, the implementation 331may also require exposing internal states of ``pw_ring_buffer``. 332 333-------------- 334Open questions 335-------------- 336