1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_multisink/multisink.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22
23 #include "pw_function/function.h"
24 #include "pw_span/span.h"
25 #include "pw_status/status.h"
26 #include "pw_unit_test/framework.h"
27
28 namespace pw::multisink {
29 namespace {
30
31 using Drain = MultiSink::Drain;
32 using Listener = MultiSink::Listener;
33
34 class CountingListener : public Listener {
35 public:
OnNewEntryAvailable()36 void OnNewEntryAvailable() override { notification_count_++; }
37
GetNotificationCount()38 size_t GetNotificationCount() { return notification_count_; }
39
ResetNotificationCount()40 void ResetNotificationCount() { notification_count_ = 0; }
41
42 private:
43 size_t notification_count_ = 0;
44 };
45
46 class EntriesSizeMonitorListener : public Listener {
47 public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)48 EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
49 : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()50 void OnNewEntryAvailable() override {
51 last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
52 }
53
GetLastSeenUnreadEntriesSize() const54 size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
55
56 private:
57 size_t last_seen_unread_size_;
58 pw::multisink::MultiSink::Drain& drain_;
59 };
60
61 class MultiSinkTest : public ::testing::Test {
62 protected:
63 static constexpr std::byte kMessage[] = {
64 (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
65 static constexpr std::byte kMessageOther[] = {
66 (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
67 static constexpr size_t kMaxDrains = 3;
68 static constexpr size_t kMaxListeners = 3;
69 static constexpr size_t kEntryBufferSize = 1024;
70 static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
71
MultiSinkTest()72 MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
73
74 // Expects the peeked or popped message to equal the provided non-empty
75 // message, and the drop count to match. If `expected_message` is empty, the
76 // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)77 void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
78 uint32_t result_drop_count,
79 uint32_t result_ingress_drop_count,
80 std::optional<ConstByteSpan> expected_message,
81 uint32_t expected_drop_count,
82 uint32_t expected_ingress_drop_count) {
83 if (!expected_message.has_value()) {
84 EXPECT_EQ(Status::OutOfRange(), result.status());
85 } else {
86 ASSERT_EQ(result.status(), OkStatus());
87 if (!expected_message.value().empty()) {
88 ASSERT_FALSE(result.value().empty());
89 ASSERT_EQ(result.value().size_bytes(),
90 expected_message.value().size_bytes());
91 EXPECT_EQ(memcmp(result.value().data(),
92 expected_message.value().data(),
93 expected_message.value().size_bytes()),
94 0);
95 }
96 }
97 EXPECT_EQ(result_drop_count, expected_drop_count);
98 EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
99 }
100
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)101 void VerifyPopEntry(Drain& drain,
102 std::optional<ConstByteSpan> expected_message,
103 uint32_t expected_drop_count,
104 uint32_t expected_ingress_drop_count) {
105 uint32_t drop_count = 0;
106 uint32_t ingress_drop_count = 0;
107 Result<ConstByteSpan> result =
108 drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109 ExpectMessageAndDropCounts(result,
110 drop_count,
111 ingress_drop_count,
112 expected_message,
113 expected_drop_count,
114 expected_ingress_drop_count);
115 }
116
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)117 void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
118 uint32_t result_drop_count,
119 uint32_t result_ingress_drop_count,
120 std::optional<ConstByteSpan> expected_message,
121 uint32_t expected_drop_count,
122 uint32_t expected_ingress_drop_count) {
123 if (peek_result.ok()) {
124 ASSERT_FALSE(peek_result.value().entry().empty());
125 Result<ConstByteSpan> verify_result(peek_result.value().entry());
126 ExpectMessageAndDropCounts(verify_result,
127 result_drop_count,
128 result_ingress_drop_count,
129 expected_message,
130 expected_drop_count,
131 expected_ingress_drop_count);
132 return;
133 }
134 if (expected_message.has_value()) {
135 // Fail since we expected OkStatus.
136 ASSERT_EQ(peek_result.status(), OkStatus());
137 }
138 EXPECT_EQ(Status::OutOfRange(), peek_result.status());
139 }
140
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)141 void ExpectNotificationCount(CountingListener& listener,
142 size_t expected_notification_count) {
143 EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
144 listener.ResetNotificationCount();
145 }
146
147 std::byte buffer_[kBufferSize];
148 std::byte entry_buffer_[kEntryBufferSize];
149 CountingListener listeners_[kMaxListeners];
150 Drain drains_[kMaxDrains];
151 MultiSink multisink_;
152 };
153
TEST_F(MultiSinkTest,SingleDrain)154 TEST_F(MultiSinkTest, SingleDrain) {
155 multisink_.AttachDrain(drains_[0]);
156 multisink_.AttachListener(listeners_[0]);
157 ExpectNotificationCount(listeners_[0], 1u);
158 multisink_.HandleEntry(kMessage);
159
160 // Single entry push and pop.
161 ExpectNotificationCount(listeners_[0], 1u);
162 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
163 // Single empty entry push and pop.
164 multisink_.HandleEntry(ConstByteSpan());
165 ExpectNotificationCount(listeners_[0], 1u);
166 VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
167
168 // Multiple entries with intermittent drops.
169 multisink_.HandleEntry(kMessage);
170 multisink_.HandleDropped();
171 multisink_.HandleEntry(kMessage);
172 ExpectNotificationCount(listeners_[0], 3u);
173 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
174 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
175
176 // Send drops only.
177 multisink_.HandleDropped();
178 ExpectNotificationCount(listeners_[0], 1u);
179 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
180
181 // Confirm out-of-range if no entries are expected.
182 ExpectNotificationCount(listeners_[0], 0u);
183 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
184 }
185
TEST_F(MultiSinkTest,MultipleDrain)186 TEST_F(MultiSinkTest, MultipleDrain) {
187 multisink_.AttachDrain(drains_[0]);
188 multisink_.AttachDrain(drains_[1]);
189 multisink_.AttachListener(listeners_[0]);
190 multisink_.AttachListener(listeners_[1]);
191 ExpectNotificationCount(listeners_[0], 1u);
192 ExpectNotificationCount(listeners_[1], 1u);
193
194 multisink_.HandleEntry(kMessage);
195 multisink_.HandleEntry(kMessage);
196 multisink_.HandleDropped();
197 multisink_.HandleEntry(kMessage);
198 multisink_.HandleDropped();
199
200 // Drain one drain entirely.
201 ExpectNotificationCount(listeners_[0], 5u);
202 ExpectNotificationCount(listeners_[1], 5u);
203 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
204 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
205 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
206 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
207 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
208
209 // Confirm the other drain can be drained separately.
210 ExpectNotificationCount(listeners_[0], 0u);
211 ExpectNotificationCount(listeners_[1], 0u);
212 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
213 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
214 VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
215 VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
216 VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
217 }
218
TEST_F(MultiSinkTest,LateDrainRegistration)219 TEST_F(MultiSinkTest, LateDrainRegistration) {
220 // Drains attached after entries are pushed should still observe those entries
221 // if they have not been evicted from the ring buffer.
222 multisink_.HandleEntry(kMessage);
223
224 multisink_.AttachDrain(drains_[0]);
225 multisink_.AttachListener(listeners_[0]);
226 ExpectNotificationCount(listeners_[0], 1u);
227 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
228 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
229
230 multisink_.HandleEntry(kMessage);
231 ExpectNotificationCount(listeners_[0], 1u);
232 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
233 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
234 }
235
TEST_F(MultiSinkTest,DynamicDrainRegistration)236 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
237 multisink_.AttachDrain(drains_[0]);
238 multisink_.AttachListener(listeners_[0]);
239 ExpectNotificationCount(listeners_[0], 1u);
240
241 multisink_.HandleDropped();
242 multisink_.HandleEntry(kMessage);
243 multisink_.HandleDropped();
244 multisink_.HandleEntry(kMessage);
245
246 // Drain out one message and detach it.
247 ExpectNotificationCount(listeners_[0], 4u);
248 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
249 multisink_.DetachDrain(drains_[0]);
250 multisink_.DetachListener(listeners_[0]);
251
252 // Re-attaching the drain should reproduce the last observed message. Note
253 // that notifications are not expected, nor are drops observed before the
254 // first valid message in the buffer.
255 multisink_.AttachDrain(drains_[0]);
256 multisink_.AttachListener(listeners_[0]);
257 ExpectNotificationCount(listeners_[0], 1u);
258 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
259 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
260 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
261
262 multisink_.HandleEntry(kMessage);
263 ExpectNotificationCount(listeners_[0], 1u);
264 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
265 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
266 }
267
TEST_F(MultiSinkTest,TooSmallBuffer)268 TEST_F(MultiSinkTest, TooSmallBuffer) {
269 multisink_.AttachDrain(drains_[0]);
270
271 // Insert an entry and a drop, then try to read into an insufficient buffer.
272 uint32_t drop_count = 0;
273 uint32_t ingress_drop_count = 0;
274 multisink_.HandleDropped();
275 multisink_.HandleEntry(kMessage);
276
277 // Attempting to acquire an entry with a small buffer should result in
278 // RESOURCE_EXHAUSTED and remove it.
279 Result<ConstByteSpan> result = drains_[0].PopEntry(
280 span(entry_buffer_, 1), drop_count, ingress_drop_count);
281 EXPECT_EQ(result.status(), Status::ResourceExhausted());
282
283 VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
284 }
285
TEST_F(MultiSinkTest,Iterator)286 TEST_F(MultiSinkTest, Iterator) {
287 multisink_.AttachDrain(drains_[0]);
288
289 // Insert entries and consume them all.
290 multisink_.HandleEntry(kMessage);
291 multisink_.HandleEntry(kMessage);
292 multisink_.HandleEntry(kMessage);
293
294 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
296 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
297
298 // Confirm that the iterator still observes the messages in the ring buffer.
299 size_t iterated_entries = 0;
300 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
301 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
302 iterated_entries++;
303 }
304 EXPECT_EQ(iterated_entries, 3u);
305 }
306
TEST_F(MultiSinkTest,IteratorNoDrains)307 TEST_F(MultiSinkTest, IteratorNoDrains) {
308 // Insert entries with no drains attached. Even though there are no consumers,
309 // iterators should still walk from the oldest entry.
310 multisink_.HandleEntry(kMessage);
311 multisink_.HandleEntry(kMessage);
312 multisink_.HandleEntry(kMessage);
313
314 // Confirm that the iterator still observes the messages in the ring buffer.
315 size_t iterated_entries = 0;
316 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
317 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
318 iterated_entries++;
319 }
320 EXPECT_EQ(iterated_entries, 3u);
321 }
322
TEST_F(MultiSinkTest,IteratorNoEntries)323 TEST_F(MultiSinkTest, IteratorNoEntries) {
324 // Attach a drain, but don't add any entries.
325 multisink_.AttachDrain(drains_[0]);
326 // Confirm that the iterator has no entries.
327 MultiSink::UnsafeIterationWrapper unsafe_iterator =
328 multisink_.UnsafeIteration();
329 EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
330 }
331
TEST_F(MultiSinkTest,PeekEntryNoEntries)332 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
333 multisink_.AttachDrain(drains_[0]);
334
335 // Peek empty multisink.
336 uint32_t drop_count = 0;
337 uint32_t ingress_drop_count = 0;
338 auto peek_result =
339 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
340 VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
341 }
342
TEST_F(MultiSinkTest,PeekAndPop)343 TEST_F(MultiSinkTest, PeekAndPop) {
344 multisink_.AttachDrain(drains_[0]);
345 multisink_.AttachDrain(drains_[1]);
346
347 // Peek entry after multisink has some entries.
348 multisink_.HandleEntry(kMessage);
349 multisink_.HandleEntry(kMessageOther);
350 uint32_t drop_count = 0;
351 uint32_t ingress_drop_count = 0;
352 auto first_peek_result =
353 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
354 VerifyPeekResult(
355 first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
356
357 // Multiple peeks must return the front message.
358 auto peek_duplicate =
359 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
360 VerifyPeekResult(
361 peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
362 // A second drain must peek the front message.
363 auto peek_other_drain =
364 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
365 VerifyPeekResult(
366 peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
367
368 // After a drain pops a peeked entry, the next peek call must return the next
369 // message.
370 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
371 auto second_peek_result =
372 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
373 VerifyPeekResult(
374 second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
375 // Slower readers must be unchanged.
376 auto peek_other_drain_duplicate =
377 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
378 VerifyPeekResult(peek_other_drain_duplicate,
379 drop_count,
380 ingress_drop_count,
381 kMessage,
382 0,
383 0);
384
385 // PopEntry prior to popping the previously peeked entry.
386 VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
387 // Popping an entry already handled must not trigger errors.
388 ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
389 // Popping with an old peek context must not trigger errors.
390 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
391
392 // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
393 VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
394 auto empty_peek_result =
395 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
396 VerifyPeekResult(
397 empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
398
399 // // Slower readers must be unchanged.
400 auto peek_other_drain_unchanged =
401 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
402 VerifyPeekResult(peek_other_drain_unchanged,
403 drop_count,
404 ingress_drop_count,
405 kMessage,
406 0,
407 0);
408 }
409
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)410 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
411 multisink_.AttachDrain(drains_[0]);
412
413 // Peek entry after multisink has some entries.
414 multisink_.HandleEntry(kMessage);
415 const uint32_t ingress_drops = 10;
416 multisink_.HandleDropped(ingress_drops);
417
418 uint32_t drop_count = 0;
419 uint32_t ingress_drop_count = 0;
420 auto peek_result1 =
421 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
422 // No drops reported until the drain finds a gap in the sequence IDs.
423 VerifyPeekResult(
424 peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
425
426 // Popping the peeked entry advances the drain, and a new peek will find the
427 // gap in sequence IDs.
428 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
429 auto peek_result2 =
430 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
431 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
432 EXPECT_EQ(drop_count, 0u);
433 EXPECT_EQ(ingress_drop_count, ingress_drops);
434 }
435
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)436 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
437 multisink_.AttachDrain(drains_[0]);
438
439 // Add entries until buffer is full and drain has to be advanced.
440 // The sequence ID takes 1 byte when less than 128.
441 const size_t max_multisink_messages = 128;
442 const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
443 // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
444 const size_t message_size = buffer_entry_size - 2;
445 std::array<std::byte, message_size> message;
446 std::memset(message.data(), 'a', message.size());
447 for (size_t i = 0; i < max_multisink_messages; ++i) {
448 message[0] = static_cast<std::byte>(i);
449 multisink_.HandleEntry(message);
450 }
451
452 // At this point the buffer is full, but the sequence ID will take 1 more byte
453 // in the preamble, meaning that adding N new entries, drops N + 1 entries.
454 // Account for that offset.
455 const size_t expected_drops = 5;
456 for (size_t i = 1; i < expected_drops; ++i) {
457 message[0] = static_cast<std::byte>(200 + i);
458 multisink_.HandleEntry(message);
459 }
460
461 uint32_t drop_count = 0;
462 uint32_t ingress_drop_count = 0;
463
464 auto peek_result =
465 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
466 // The message peeked is the 6th message added.
467 message[0] = static_cast<std::byte>(5);
468 VerifyPeekResult(
469 peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
470
471 // Add 3 more messages since we peeked the multisink, generating 2 more drops.
472 const size_t expected_drops2 = 2;
473 for (size_t i = 0; i < expected_drops2 + 1; ++i) {
474 message[0] = static_cast<std::byte>(220 + i);
475 multisink_.HandleEntry(message);
476 }
477
478 // Pop the 6th message now, even though it was already dropped.
479 EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
480
481 // A new peek would get the 9th message because two more messages were
482 // dropped. Given that PopEntry() was called with peek_result, all the dropped
483 // messages before peek_result should be considered handled and only the two
484 // new drops should be reported here.
485 auto peek_result2 =
486 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
487 message[0] = static_cast<std::byte>(8); // 9th message
488 VerifyPeekResult(peek_result2,
489 drop_count,
490 ingress_drop_count,
491 message,
492 expected_drops2,
493 0);
494 }
495
TEST_F(MultiSinkTest,IngressDropCountOverflow)496 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
497 multisink_.AttachDrain(drains_[0]);
498
499 // Make drain's last handled drop larger than multisink drop count, which
500 // overflowed.
501 const uint32_t drop_count_close_to_overflow =
502 std::numeric_limits<uint32_t>::max() - 3;
503 multisink_.HandleDropped(drop_count_close_to_overflow);
504 multisink_.HandleEntry(kMessage);
505
506 // Catch up drain's drop count.
507 uint32_t drop_count = 0;
508 uint32_t ingress_drop_count = 0;
509 auto peek_result1 =
510 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
511 VerifyPeekResult(peek_result1,
512 drop_count,
513 ingress_drop_count,
514 kMessage,
515 0,
516 drop_count_close_to_overflow);
517 // Popping the peeked entry advances the drain, and a new peek will find the
518 // gap in sequence IDs.
519 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
520
521 // Overflow multisink's drop count.
522 const uint32_t expected_ingress_drop_count = 10;
523 multisink_.HandleDropped(expected_ingress_drop_count);
524
525 auto peek_result2 =
526 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
527 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
528 EXPECT_EQ(drop_count, 0u);
529 EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
530
531 multisink_.HandleEntry(kMessage);
532 auto peek_result3 =
533 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
534 VerifyPeekResult(
535 peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
536 }
537
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)538 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
539 multisink_.AttachDrain(drains_[0]);
540
541 const uint32_t ingress_drops = 10;
542 multisink_.HandleDropped(ingress_drops);
543 multisink_.HandleEntry(kMessage);
544 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
545 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
546 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
547
548 // Detaching and attaching drain should report the same drops.
549 multisink_.DetachDrain(drains_[0]);
550 multisink_.AttachDrain(drains_[0]);
551 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
552 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
553 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
554 }
555
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)556 TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
557 multisink_.AttachDrain(drains_[0]);
558
559 EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
560 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
561 multisink_.HandleEntry(kMessage);
562 multisink_.HandleEntry(kMessage);
563 const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
564 EXPECT_GT(unread_entries_size, 0u);
565
566 // Attach another drain, it sees the same unread entriess size as the first
567 // drain.
568 multisink_.AttachDrain(drains_[1]);
569 EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
570
571 // Pop entries from the first drain.
572 VerifyPopEntry(drains_[0],
573 /*expected_message=*/kMessage,
574 /*expected_drop_count=*/0,
575 /*expected_ingress_drop_count=*/0);
576 VerifyPopEntry(drains_[0],
577 /*expected_message=*/kMessage,
578 /*expected_drop_count=*/0,
579 /*expected_ingress_drop_count=*/0);
580 EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
581 EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
582 EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
583 EXPECT_EQ(drains_[1].GetUnreadEntriesCount(), 2u);
584 }
585
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)586 TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
587 std::array<std::byte, 32> buffer;
588 MultiSink multisink(buffer);
589
590 pw::multisink::MultiSink::Drain drain;
591 multisink.AttachDrain(drain);
592
593 EntriesSizeMonitorListener listener(drain);
594 multisink.AttachListener(listener);
595
596 ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
597
598 constexpr std::string_view kEntryToPush = "one";
599 multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
600
601 EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
602 drain.GetUnreadEntriesSize());
603 }
604
TEST(UnsafeIteration,NoLimit)605 TEST(UnsafeIteration, NoLimit) {
606 constexpr std::array<std::string_view, 5> kExpectedEntries{
607 "one", "two", "three", "four", "five"};
608 std::array<std::byte, 32> buffer;
609 MultiSink multisink(buffer);
610
611 for (std::string_view entry : kExpectedEntries) {
612 multisink.HandleEntry(as_bytes(span<const char>(entry)));
613 }
614
615 size_t entry_count = 0;
616 struct {
617 size_t& entry_count;
618 span<const std::string_view> expected_results;
619 } ctx{entry_count, kExpectedEntries};
620 auto cb = [&ctx](ConstByteSpan data) {
621 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
622 EXPECT_EQ(data.size(), expected_entry.size());
623 const int result =
624 memcmp(data.data(), expected_entry.data(), expected_entry.size());
625 EXPECT_EQ(0, result);
626 ctx.entry_count++;
627 };
628
629 EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
630 EXPECT_EQ(kExpectedEntries.size(), entry_count);
631 }
632
TEST(UnsafeIteration,Subset)633 TEST(UnsafeIteration, Subset) {
634 constexpr std::array<std::string_view, 5> kExpectedEntries{
635 "one", "two", "three", "four", "five"};
636 constexpr size_t kStartOffset = 3;
637 constexpr size_t kExpectedEntriesMaxEntries =
638 kExpectedEntries.size() - kStartOffset;
639 std::array<std::byte, 32> buffer;
640 MultiSink multisink(buffer);
641
642 for (std::string_view entry : kExpectedEntries) {
643 multisink.HandleEntry(as_bytes(span<const char>(entry)));
644 }
645
646 size_t entry_count = 0;
647 struct {
648 size_t& entry_count;
649 span<const std::string_view> expected_results;
650 } ctx{entry_count, kExpectedEntries};
651 auto cb = [&ctx](ConstByteSpan data) {
652 std::string_view expected_entry =
653 ctx.expected_results[ctx.entry_count + kStartOffset];
654 EXPECT_EQ(data.size(), expected_entry.size());
655 const int result =
656 memcmp(data.data(), expected_entry.data(), expected_entry.size());
657 EXPECT_EQ(0, result);
658 ctx.entry_count++;
659 };
660
661 EXPECT_EQ(
662 OkStatus(),
663 multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
664 EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
665 }
666
TEST(UnsafeIterationFromEnd,NoTruncation)667 TEST(UnsafeIterationFromEnd, NoTruncation) {
668 constexpr std::array<std::string_view, 5> kExpectedEntries{
669 "one", "two", "three", "four", "five"};
670 constexpr size_t buffer_size = 32;
671 std::array<std::byte, buffer_size> buffer;
672 MultiSink multisink(buffer);
673
674 for (std::string_view entry : kExpectedEntries) {
675 multisink.HandleEntry(as_bytes(span<const char>(entry)));
676 }
677
678 size_t entry_count = 0;
679 struct {
680 size_t& entry_count;
681 span<const std::string_view> expected_results;
682 } ctx{entry_count, kExpectedEntries};
683 auto cb = [&ctx](ConstByteSpan data) {
684 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
685 EXPECT_EQ(data.size(), expected_entry.size());
686 const int result =
687 memcmp(data.data(), expected_entry.data(), expected_entry.size());
688 EXPECT_EQ(0, result);
689 ctx.entry_count++;
690 };
691
692 EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntryFromEnd(cb, buffer_size));
693 EXPECT_EQ(kExpectedEntries.size(), entry_count);
694 }
695
TEST(UnsafeIterationFromEnd,Truncation)696 TEST(UnsafeIterationFromEnd, Truncation) {
697 constexpr std::array<std::string_view, 5> kEntries{
698 "one", "two", "three", "four", "five"};
699 constexpr std::array<std::string_view, 3> kExpectedEntries{
700 "three", "four", "five"};
701 constexpr size_t buffer_size = 32;
702 std::array<std::byte, buffer_size> buffer;
703 MultiSink multisink(buffer);
704
705 for (std::string_view entry : kEntries) {
706 multisink.HandleEntry(as_bytes(span<const char>(entry)));
707 }
708
709 size_t entry_count = 0;
710 struct {
711 size_t& entry_count;
712 span<const std::string_view> expected_results;
713 } ctx{entry_count, kExpectedEntries};
714 auto cb = [&ctx](ConstByteSpan data) {
715 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
716 EXPECT_EQ(data.size(), expected_entry.size());
717 const int result =
718 memcmp(data.data(), expected_entry.data(), expected_entry.size());
719 EXPECT_EQ(0, result);
720 ctx.entry_count++;
721 };
722
723 EXPECT_EQ(OkStatus(),
724 multisink.UnsafeForEachEntryFromEnd(cb, buffer_size / 2));
725 EXPECT_EQ(kExpectedEntries.size(), entry_count);
726 }
727
728 } // namespace
729 } // namespace pw::multisink
730