xref: /aosp_15_r20/external/pigweed/pw_multisink/multisink_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_multisink/multisink.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22 
23 #include "pw_function/function.h"
24 #include "pw_span/span.h"
25 #include "pw_status/status.h"
26 #include "pw_unit_test/framework.h"
27 
28 namespace pw::multisink {
29 namespace {
30 
31 using Drain = MultiSink::Drain;
32 using Listener = MultiSink::Listener;
33 
34 class CountingListener : public Listener {
35  public:
OnNewEntryAvailable()36   void OnNewEntryAvailable() override { notification_count_++; }
37 
GetNotificationCount()38   size_t GetNotificationCount() { return notification_count_; }
39 
ResetNotificationCount()40   void ResetNotificationCount() { notification_count_ = 0; }
41 
42  private:
43   size_t notification_count_ = 0;
44 };
45 
46 class EntriesSizeMonitorListener : public Listener {
47  public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)48   EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
49       : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()50   void OnNewEntryAvailable() override {
51     last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
52   }
53 
GetLastSeenUnreadEntriesSize() const54   size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
55 
56  private:
57   size_t last_seen_unread_size_;
58   pw::multisink::MultiSink::Drain& drain_;
59 };
60 
61 class MultiSinkTest : public ::testing::Test {
62  protected:
63   static constexpr std::byte kMessage[] = {
64       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
65   static constexpr std::byte kMessageOther[] = {
66       (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
67   static constexpr size_t kMaxDrains = 3;
68   static constexpr size_t kMaxListeners = 3;
69   static constexpr size_t kEntryBufferSize = 1024;
70   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
71 
MultiSinkTest()72   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
73 
74   // Expects the peeked or popped message to equal the provided non-empty
75   // message, and the drop count to match. If `expected_message` is empty, the
76   // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)77   void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
78                                   uint32_t result_drop_count,
79                                   uint32_t result_ingress_drop_count,
80                                   std::optional<ConstByteSpan> expected_message,
81                                   uint32_t expected_drop_count,
82                                   uint32_t expected_ingress_drop_count) {
83     if (!expected_message.has_value()) {
84       EXPECT_EQ(Status::OutOfRange(), result.status());
85     } else {
86       ASSERT_EQ(result.status(), OkStatus());
87       if (!expected_message.value().empty()) {
88         ASSERT_FALSE(result.value().empty());
89         ASSERT_EQ(result.value().size_bytes(),
90                   expected_message.value().size_bytes());
91         EXPECT_EQ(memcmp(result.value().data(),
92                          expected_message.value().data(),
93                          expected_message.value().size_bytes()),
94                   0);
95       }
96     }
97     EXPECT_EQ(result_drop_count, expected_drop_count);
98     EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
99   }
100 
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)101   void VerifyPopEntry(Drain& drain,
102                       std::optional<ConstByteSpan> expected_message,
103                       uint32_t expected_drop_count,
104                       uint32_t expected_ingress_drop_count) {
105     uint32_t drop_count = 0;
106     uint32_t ingress_drop_count = 0;
107     Result<ConstByteSpan> result =
108         drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109     ExpectMessageAndDropCounts(result,
110                                drop_count,
111                                ingress_drop_count,
112                                expected_message,
113                                expected_drop_count,
114                                expected_ingress_drop_count);
115   }
116 
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)117   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
118                         uint32_t result_drop_count,
119                         uint32_t result_ingress_drop_count,
120                         std::optional<ConstByteSpan> expected_message,
121                         uint32_t expected_drop_count,
122                         uint32_t expected_ingress_drop_count) {
123     if (peek_result.ok()) {
124       ASSERT_FALSE(peek_result.value().entry().empty());
125       Result<ConstByteSpan> verify_result(peek_result.value().entry());
126       ExpectMessageAndDropCounts(verify_result,
127                                  result_drop_count,
128                                  result_ingress_drop_count,
129                                  expected_message,
130                                  expected_drop_count,
131                                  expected_ingress_drop_count);
132       return;
133     }
134     if (expected_message.has_value()) {
135       // Fail since we expected OkStatus.
136       ASSERT_EQ(peek_result.status(), OkStatus());
137     }
138     EXPECT_EQ(Status::OutOfRange(), peek_result.status());
139   }
140 
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)141   void ExpectNotificationCount(CountingListener& listener,
142                                size_t expected_notification_count) {
143     EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
144     listener.ResetNotificationCount();
145   }
146 
147   std::byte buffer_[kBufferSize];
148   std::byte entry_buffer_[kEntryBufferSize];
149   CountingListener listeners_[kMaxListeners];
150   Drain drains_[kMaxDrains];
151   MultiSink multisink_;
152 };
153 
TEST_F(MultiSinkTest,SingleDrain)154 TEST_F(MultiSinkTest, SingleDrain) {
155   multisink_.AttachDrain(drains_[0]);
156   multisink_.AttachListener(listeners_[0]);
157   ExpectNotificationCount(listeners_[0], 1u);
158   multisink_.HandleEntry(kMessage);
159 
160   // Single entry push and pop.
161   ExpectNotificationCount(listeners_[0], 1u);
162   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
163   // Single empty entry push and pop.
164   multisink_.HandleEntry(ConstByteSpan());
165   ExpectNotificationCount(listeners_[0], 1u);
166   VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
167 
168   // Multiple entries with intermittent drops.
169   multisink_.HandleEntry(kMessage);
170   multisink_.HandleDropped();
171   multisink_.HandleEntry(kMessage);
172   ExpectNotificationCount(listeners_[0], 3u);
173   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
174   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
175 
176   // Send drops only.
177   multisink_.HandleDropped();
178   ExpectNotificationCount(listeners_[0], 1u);
179   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
180 
181   // Confirm out-of-range if no entries are expected.
182   ExpectNotificationCount(listeners_[0], 0u);
183   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
184 }
185 
TEST_F(MultiSinkTest,MultipleDrain)186 TEST_F(MultiSinkTest, MultipleDrain) {
187   multisink_.AttachDrain(drains_[0]);
188   multisink_.AttachDrain(drains_[1]);
189   multisink_.AttachListener(listeners_[0]);
190   multisink_.AttachListener(listeners_[1]);
191   ExpectNotificationCount(listeners_[0], 1u);
192   ExpectNotificationCount(listeners_[1], 1u);
193 
194   multisink_.HandleEntry(kMessage);
195   multisink_.HandleEntry(kMessage);
196   multisink_.HandleDropped();
197   multisink_.HandleEntry(kMessage);
198   multisink_.HandleDropped();
199 
200   // Drain one drain entirely.
201   ExpectNotificationCount(listeners_[0], 5u);
202   ExpectNotificationCount(listeners_[1], 5u);
203   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
204   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
205   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
206   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
207   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
208 
209   // Confirm the other drain can be drained separately.
210   ExpectNotificationCount(listeners_[0], 0u);
211   ExpectNotificationCount(listeners_[1], 0u);
212   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
213   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
214   VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
215   VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
216   VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
217 }
218 
TEST_F(MultiSinkTest,LateDrainRegistration)219 TEST_F(MultiSinkTest, LateDrainRegistration) {
220   // Drains attached after entries are pushed should still observe those entries
221   // if they have not been evicted from the ring buffer.
222   multisink_.HandleEntry(kMessage);
223 
224   multisink_.AttachDrain(drains_[0]);
225   multisink_.AttachListener(listeners_[0]);
226   ExpectNotificationCount(listeners_[0], 1u);
227   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
228   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
229 
230   multisink_.HandleEntry(kMessage);
231   ExpectNotificationCount(listeners_[0], 1u);
232   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
233   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
234 }
235 
TEST_F(MultiSinkTest,DynamicDrainRegistration)236 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
237   multisink_.AttachDrain(drains_[0]);
238   multisink_.AttachListener(listeners_[0]);
239   ExpectNotificationCount(listeners_[0], 1u);
240 
241   multisink_.HandleDropped();
242   multisink_.HandleEntry(kMessage);
243   multisink_.HandleDropped();
244   multisink_.HandleEntry(kMessage);
245 
246   // Drain out one message and detach it.
247   ExpectNotificationCount(listeners_[0], 4u);
248   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
249   multisink_.DetachDrain(drains_[0]);
250   multisink_.DetachListener(listeners_[0]);
251 
252   // Re-attaching the drain should reproduce the last observed message. Note
253   // that notifications are not expected, nor are drops observed before the
254   // first valid message in the buffer.
255   multisink_.AttachDrain(drains_[0]);
256   multisink_.AttachListener(listeners_[0]);
257   ExpectNotificationCount(listeners_[0], 1u);
258   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
259   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
260   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
261 
262   multisink_.HandleEntry(kMessage);
263   ExpectNotificationCount(listeners_[0], 1u);
264   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
265   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
266 }
267 
TEST_F(MultiSinkTest,TooSmallBuffer)268 TEST_F(MultiSinkTest, TooSmallBuffer) {
269   multisink_.AttachDrain(drains_[0]);
270 
271   // Insert an entry and a drop, then try to read into an insufficient buffer.
272   uint32_t drop_count = 0;
273   uint32_t ingress_drop_count = 0;
274   multisink_.HandleDropped();
275   multisink_.HandleEntry(kMessage);
276 
277   // Attempting to acquire an entry with a small buffer should result in
278   // RESOURCE_EXHAUSTED and remove it.
279   Result<ConstByteSpan> result = drains_[0].PopEntry(
280       span(entry_buffer_, 1), drop_count, ingress_drop_count);
281   EXPECT_EQ(result.status(), Status::ResourceExhausted());
282 
283   VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
284 }
285 
TEST_F(MultiSinkTest,Iterator)286 TEST_F(MultiSinkTest, Iterator) {
287   multisink_.AttachDrain(drains_[0]);
288 
289   // Insert entries and consume them all.
290   multisink_.HandleEntry(kMessage);
291   multisink_.HandleEntry(kMessage);
292   multisink_.HandleEntry(kMessage);
293 
294   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
296   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
297 
298   // Confirm that the iterator still observes the messages in the ring buffer.
299   size_t iterated_entries = 0;
300   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
301     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
302     iterated_entries++;
303   }
304   EXPECT_EQ(iterated_entries, 3u);
305 }
306 
TEST_F(MultiSinkTest,IteratorNoDrains)307 TEST_F(MultiSinkTest, IteratorNoDrains) {
308   // Insert entries with no drains attached. Even though there are no consumers,
309   // iterators should still walk from the oldest entry.
310   multisink_.HandleEntry(kMessage);
311   multisink_.HandleEntry(kMessage);
312   multisink_.HandleEntry(kMessage);
313 
314   // Confirm that the iterator still observes the messages in the ring buffer.
315   size_t iterated_entries = 0;
316   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
317     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
318     iterated_entries++;
319   }
320   EXPECT_EQ(iterated_entries, 3u);
321 }
322 
TEST_F(MultiSinkTest,IteratorNoEntries)323 TEST_F(MultiSinkTest, IteratorNoEntries) {
324   // Attach a drain, but don't add any entries.
325   multisink_.AttachDrain(drains_[0]);
326   // Confirm that the iterator has no entries.
327   MultiSink::UnsafeIterationWrapper unsafe_iterator =
328       multisink_.UnsafeIteration();
329   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
330 }
331 
TEST_F(MultiSinkTest,PeekEntryNoEntries)332 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
333   multisink_.AttachDrain(drains_[0]);
334 
335   // Peek empty multisink.
336   uint32_t drop_count = 0;
337   uint32_t ingress_drop_count = 0;
338   auto peek_result =
339       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
340   VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
341 }
342 
TEST_F(MultiSinkTest,PeekAndPop)343 TEST_F(MultiSinkTest, PeekAndPop) {
344   multisink_.AttachDrain(drains_[0]);
345   multisink_.AttachDrain(drains_[1]);
346 
347   // Peek entry after multisink has some entries.
348   multisink_.HandleEntry(kMessage);
349   multisink_.HandleEntry(kMessageOther);
350   uint32_t drop_count = 0;
351   uint32_t ingress_drop_count = 0;
352   auto first_peek_result =
353       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
354   VerifyPeekResult(
355       first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
356 
357   // Multiple peeks must return the front message.
358   auto peek_duplicate =
359       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
360   VerifyPeekResult(
361       peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
362   // A second drain must peek the front message.
363   auto peek_other_drain =
364       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
365   VerifyPeekResult(
366       peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
367 
368   // After a drain pops a peeked entry, the next peek call must return the next
369   // message.
370   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
371   auto second_peek_result =
372       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
373   VerifyPeekResult(
374       second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
375   // Slower readers must be unchanged.
376   auto peek_other_drain_duplicate =
377       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
378   VerifyPeekResult(peek_other_drain_duplicate,
379                    drop_count,
380                    ingress_drop_count,
381                    kMessage,
382                    0,
383                    0);
384 
385   // PopEntry prior to popping the previously peeked entry.
386   VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
387   // Popping an entry already handled must not trigger errors.
388   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
389   // Popping with an old peek context must not trigger errors.
390   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
391 
392   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
393   VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
394   auto empty_peek_result =
395       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
396   VerifyPeekResult(
397       empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
398 
399   // // Slower readers must be unchanged.
400   auto peek_other_drain_unchanged =
401       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
402   VerifyPeekResult(peek_other_drain_unchanged,
403                    drop_count,
404                    ingress_drop_count,
405                    kMessage,
406                    0,
407                    0);
408 }
409 
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)410 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
411   multisink_.AttachDrain(drains_[0]);
412 
413   // Peek entry after multisink has some entries.
414   multisink_.HandleEntry(kMessage);
415   const uint32_t ingress_drops = 10;
416   multisink_.HandleDropped(ingress_drops);
417 
418   uint32_t drop_count = 0;
419   uint32_t ingress_drop_count = 0;
420   auto peek_result1 =
421       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
422   // No drops reported until the drain finds a gap in the sequence IDs.
423   VerifyPeekResult(
424       peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
425 
426   // Popping the peeked entry advances the drain, and a new peek will find the
427   // gap in sequence IDs.
428   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
429   auto peek_result2 =
430       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
431   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
432   EXPECT_EQ(drop_count, 0u);
433   EXPECT_EQ(ingress_drop_count, ingress_drops);
434 }
435 
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)436 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
437   multisink_.AttachDrain(drains_[0]);
438 
439   // Add entries until buffer is full and drain has to be advanced.
440   // The sequence ID takes 1 byte when less than 128.
441   const size_t max_multisink_messages = 128;
442   const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
443   // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
444   const size_t message_size = buffer_entry_size - 2;
445   std::array<std::byte, message_size> message;
446   std::memset(message.data(), 'a', message.size());
447   for (size_t i = 0; i < max_multisink_messages; ++i) {
448     message[0] = static_cast<std::byte>(i);
449     multisink_.HandleEntry(message);
450   }
451 
452   // At this point the buffer is full, but the sequence ID will take 1 more byte
453   // in the preamble, meaning that adding N new entries, drops N + 1 entries.
454   // Account for that offset.
455   const size_t expected_drops = 5;
456   for (size_t i = 1; i < expected_drops; ++i) {
457     message[0] = static_cast<std::byte>(200 + i);
458     multisink_.HandleEntry(message);
459   }
460 
461   uint32_t drop_count = 0;
462   uint32_t ingress_drop_count = 0;
463 
464   auto peek_result =
465       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
466   // The message peeked is the 6th message added.
467   message[0] = static_cast<std::byte>(5);
468   VerifyPeekResult(
469       peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
470 
471   // Add 3 more messages since we peeked the multisink, generating 2 more drops.
472   const size_t expected_drops2 = 2;
473   for (size_t i = 0; i < expected_drops2 + 1; ++i) {
474     message[0] = static_cast<std::byte>(220 + i);
475     multisink_.HandleEntry(message);
476   }
477 
478   // Pop the 6th message now, even though it was already dropped.
479   EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
480 
481   // A new peek would get the 9th message because two more messages were
482   // dropped. Given that PopEntry() was called with peek_result, all the dropped
483   // messages before peek_result should be considered handled and only the two
484   // new drops should be reported here.
485   auto peek_result2 =
486       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
487   message[0] = static_cast<std::byte>(8);  // 9th message
488   VerifyPeekResult(peek_result2,
489                    drop_count,
490                    ingress_drop_count,
491                    message,
492                    expected_drops2,
493                    0);
494 }
495 
TEST_F(MultiSinkTest,IngressDropCountOverflow)496 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
497   multisink_.AttachDrain(drains_[0]);
498 
499   // Make drain's last handled drop larger than multisink drop count, which
500   // overflowed.
501   const uint32_t drop_count_close_to_overflow =
502       std::numeric_limits<uint32_t>::max() - 3;
503   multisink_.HandleDropped(drop_count_close_to_overflow);
504   multisink_.HandleEntry(kMessage);
505 
506   // Catch up drain's drop count.
507   uint32_t drop_count = 0;
508   uint32_t ingress_drop_count = 0;
509   auto peek_result1 =
510       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
511   VerifyPeekResult(peek_result1,
512                    drop_count,
513                    ingress_drop_count,
514                    kMessage,
515                    0,
516                    drop_count_close_to_overflow);
517   // Popping the peeked entry advances the drain, and a new peek will find the
518   // gap in sequence IDs.
519   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
520 
521   // Overflow multisink's drop count.
522   const uint32_t expected_ingress_drop_count = 10;
523   multisink_.HandleDropped(expected_ingress_drop_count);
524 
525   auto peek_result2 =
526       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
527   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
528   EXPECT_EQ(drop_count, 0u);
529   EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
530 
531   multisink_.HandleEntry(kMessage);
532   auto peek_result3 =
533       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
534   VerifyPeekResult(
535       peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
536 }
537 
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)538 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
539   multisink_.AttachDrain(drains_[0]);
540 
541   const uint32_t ingress_drops = 10;
542   multisink_.HandleDropped(ingress_drops);
543   multisink_.HandleEntry(kMessage);
544   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
545   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
546   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
547 
548   // Detaching and attaching drain should report the same drops.
549   multisink_.DetachDrain(drains_[0]);
550   multisink_.AttachDrain(drains_[0]);
551   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
552   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
553   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
554 }
555 
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)556 TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
557   multisink_.AttachDrain(drains_[0]);
558 
559   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
560   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
561   multisink_.HandleEntry(kMessage);
562   multisink_.HandleEntry(kMessage);
563   const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
564   EXPECT_GT(unread_entries_size, 0u);
565 
566   // Attach another drain, it sees the same unread entriess size as the first
567   // drain.
568   multisink_.AttachDrain(drains_[1]);
569   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
570 
571   // Pop entries from the first drain.
572   VerifyPopEntry(drains_[0],
573                  /*expected_message=*/kMessage,
574                  /*expected_drop_count=*/0,
575                  /*expected_ingress_drop_count=*/0);
576   VerifyPopEntry(drains_[0],
577                  /*expected_message=*/kMessage,
578                  /*expected_drop_count=*/0,
579                  /*expected_ingress_drop_count=*/0);
580   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
581   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
582   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
583   EXPECT_EQ(drains_[1].GetUnreadEntriesCount(), 2u);
584 }
585 
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)586 TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
587   std::array<std::byte, 32> buffer;
588   MultiSink multisink(buffer);
589 
590   pw::multisink::MultiSink::Drain drain;
591   multisink.AttachDrain(drain);
592 
593   EntriesSizeMonitorListener listener(drain);
594   multisink.AttachListener(listener);
595 
596   ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
597 
598   constexpr std::string_view kEntryToPush = "one";
599   multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
600 
601   EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
602             drain.GetUnreadEntriesSize());
603 }
604 
TEST(UnsafeIteration,NoLimit)605 TEST(UnsafeIteration, NoLimit) {
606   constexpr std::array<std::string_view, 5> kExpectedEntries{
607       "one", "two", "three", "four", "five"};
608   std::array<std::byte, 32> buffer;
609   MultiSink multisink(buffer);
610 
611   for (std::string_view entry : kExpectedEntries) {
612     multisink.HandleEntry(as_bytes(span<const char>(entry)));
613   }
614 
615   size_t entry_count = 0;
616   struct {
617     size_t& entry_count;
618     span<const std::string_view> expected_results;
619   } ctx{entry_count, kExpectedEntries};
620   auto cb = [&ctx](ConstByteSpan data) {
621     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
622     EXPECT_EQ(data.size(), expected_entry.size());
623     const int result =
624         memcmp(data.data(), expected_entry.data(), expected_entry.size());
625     EXPECT_EQ(0, result);
626     ctx.entry_count++;
627   };
628 
629   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
630   EXPECT_EQ(kExpectedEntries.size(), entry_count);
631 }
632 
TEST(UnsafeIteration,Subset)633 TEST(UnsafeIteration, Subset) {
634   constexpr std::array<std::string_view, 5> kExpectedEntries{
635       "one", "two", "three", "four", "five"};
636   constexpr size_t kStartOffset = 3;
637   constexpr size_t kExpectedEntriesMaxEntries =
638       kExpectedEntries.size() - kStartOffset;
639   std::array<std::byte, 32> buffer;
640   MultiSink multisink(buffer);
641 
642   for (std::string_view entry : kExpectedEntries) {
643     multisink.HandleEntry(as_bytes(span<const char>(entry)));
644   }
645 
646   size_t entry_count = 0;
647   struct {
648     size_t& entry_count;
649     span<const std::string_view> expected_results;
650   } ctx{entry_count, kExpectedEntries};
651   auto cb = [&ctx](ConstByteSpan data) {
652     std::string_view expected_entry =
653         ctx.expected_results[ctx.entry_count + kStartOffset];
654     EXPECT_EQ(data.size(), expected_entry.size());
655     const int result =
656         memcmp(data.data(), expected_entry.data(), expected_entry.size());
657     EXPECT_EQ(0, result);
658     ctx.entry_count++;
659   };
660 
661   EXPECT_EQ(
662       OkStatus(),
663       multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
664   EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
665 }
666 
TEST(UnsafeIterationFromEnd,NoTruncation)667 TEST(UnsafeIterationFromEnd, NoTruncation) {
668   constexpr std::array<std::string_view, 5> kExpectedEntries{
669       "one", "two", "three", "four", "five"};
670   constexpr size_t buffer_size = 32;
671   std::array<std::byte, buffer_size> buffer;
672   MultiSink multisink(buffer);
673 
674   for (std::string_view entry : kExpectedEntries) {
675     multisink.HandleEntry(as_bytes(span<const char>(entry)));
676   }
677 
678   size_t entry_count = 0;
679   struct {
680     size_t& entry_count;
681     span<const std::string_view> expected_results;
682   } ctx{entry_count, kExpectedEntries};
683   auto cb = [&ctx](ConstByteSpan data) {
684     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
685     EXPECT_EQ(data.size(), expected_entry.size());
686     const int result =
687         memcmp(data.data(), expected_entry.data(), expected_entry.size());
688     EXPECT_EQ(0, result);
689     ctx.entry_count++;
690   };
691 
692   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntryFromEnd(cb, buffer_size));
693   EXPECT_EQ(kExpectedEntries.size(), entry_count);
694 }
695 
TEST(UnsafeIterationFromEnd,Truncation)696 TEST(UnsafeIterationFromEnd, Truncation) {
697   constexpr std::array<std::string_view, 5> kEntries{
698       "one", "two", "three", "four", "five"};
699   constexpr std::array<std::string_view, 3> kExpectedEntries{
700       "three", "four", "five"};
701   constexpr size_t buffer_size = 32;
702   std::array<std::byte, buffer_size> buffer;
703   MultiSink multisink(buffer);
704 
705   for (std::string_view entry : kEntries) {
706     multisink.HandleEntry(as_bytes(span<const char>(entry)));
707   }
708 
709   size_t entry_count = 0;
710   struct {
711     size_t& entry_count;
712     span<const std::string_view> expected_results;
713   } ctx{entry_count, kExpectedEntries};
714   auto cb = [&ctx](ConstByteSpan data) {
715     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
716     EXPECT_EQ(data.size(), expected_entry.size());
717     const int result =
718         memcmp(data.data(), expected_entry.data(), expected_entry.size());
719     EXPECT_EQ(0, result);
720     ctx.entry_count++;
721   };
722 
723   EXPECT_EQ(OkStatus(),
724             multisink.UnsafeForEachEntryFromEnd(cb, buffer_size / 2));
725   EXPECT_EQ(kExpectedEntries.size(), entry_count);
726 }
727 
728 }  // namespace
729 }  // namespace pw::multisink
730