1*61c4878aSAndroid Build Coastguard Worker // Copyright 2021 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_multisink/multisink.h"
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Worker #include <array>
18*61c4878aSAndroid Build Coastguard Worker #include <cstdint>
19*61c4878aSAndroid Build Coastguard Worker #include <cstring>
20*61c4878aSAndroid Build Coastguard Worker #include <optional>
21*61c4878aSAndroid Build Coastguard Worker #include <string_view>
22*61c4878aSAndroid Build Coastguard Worker
23*61c4878aSAndroid Build Coastguard Worker #include "pw_function/function.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_span/span.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
27*61c4878aSAndroid Build Coastguard Worker
28*61c4878aSAndroid Build Coastguard Worker namespace pw::multisink {
29*61c4878aSAndroid Build Coastguard Worker namespace {
30*61c4878aSAndroid Build Coastguard Worker
31*61c4878aSAndroid Build Coastguard Worker using Drain = MultiSink::Drain;
32*61c4878aSAndroid Build Coastguard Worker using Listener = MultiSink::Listener;
33*61c4878aSAndroid Build Coastguard Worker
34*61c4878aSAndroid Build Coastguard Worker class CountingListener : public Listener {
35*61c4878aSAndroid Build Coastguard Worker public:
OnNewEntryAvailable()36*61c4878aSAndroid Build Coastguard Worker void OnNewEntryAvailable() override { notification_count_++; }
37*61c4878aSAndroid Build Coastguard Worker
GetNotificationCount()38*61c4878aSAndroid Build Coastguard Worker size_t GetNotificationCount() { return notification_count_; }
39*61c4878aSAndroid Build Coastguard Worker
ResetNotificationCount()40*61c4878aSAndroid Build Coastguard Worker void ResetNotificationCount() { notification_count_ = 0; }
41*61c4878aSAndroid Build Coastguard Worker
42*61c4878aSAndroid Build Coastguard Worker private:
43*61c4878aSAndroid Build Coastguard Worker size_t notification_count_ = 0;
44*61c4878aSAndroid Build Coastguard Worker };
45*61c4878aSAndroid Build Coastguard Worker
46*61c4878aSAndroid Build Coastguard Worker class EntriesSizeMonitorListener : public Listener {
47*61c4878aSAndroid Build Coastguard Worker public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)48*61c4878aSAndroid Build Coastguard Worker EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
49*61c4878aSAndroid Build Coastguard Worker : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()50*61c4878aSAndroid Build Coastguard Worker void OnNewEntryAvailable() override {
51*61c4878aSAndroid Build Coastguard Worker last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
52*61c4878aSAndroid Build Coastguard Worker }
53*61c4878aSAndroid Build Coastguard Worker
GetLastSeenUnreadEntriesSize() const54*61c4878aSAndroid Build Coastguard Worker size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
55*61c4878aSAndroid Build Coastguard Worker
56*61c4878aSAndroid Build Coastguard Worker private:
57*61c4878aSAndroid Build Coastguard Worker size_t last_seen_unread_size_;
58*61c4878aSAndroid Build Coastguard Worker pw::multisink::MultiSink::Drain& drain_;
59*61c4878aSAndroid Build Coastguard Worker };
60*61c4878aSAndroid Build Coastguard Worker
61*61c4878aSAndroid Build Coastguard Worker class MultiSinkTest : public ::testing::Test {
62*61c4878aSAndroid Build Coastguard Worker protected:
63*61c4878aSAndroid Build Coastguard Worker static constexpr std::byte kMessage[] = {
64*61c4878aSAndroid Build Coastguard Worker (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
65*61c4878aSAndroid Build Coastguard Worker static constexpr std::byte kMessageOther[] = {
66*61c4878aSAndroid Build Coastguard Worker (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
67*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kMaxDrains = 3;
68*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kMaxListeners = 3;
69*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kEntryBufferSize = 1024;
70*61c4878aSAndroid Build Coastguard Worker static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
71*61c4878aSAndroid Build Coastguard Worker
MultiSinkTest()72*61c4878aSAndroid Build Coastguard Worker MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
73*61c4878aSAndroid Build Coastguard Worker
74*61c4878aSAndroid Build Coastguard Worker // Expects the peeked or popped message to equal the provided non-empty
75*61c4878aSAndroid Build Coastguard Worker // message, and the drop count to match. If `expected_message` is empty, the
76*61c4878aSAndroid Build Coastguard Worker // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)77*61c4878aSAndroid Build Coastguard Worker void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
78*61c4878aSAndroid Build Coastguard Worker uint32_t result_drop_count,
79*61c4878aSAndroid Build Coastguard Worker uint32_t result_ingress_drop_count,
80*61c4878aSAndroid Build Coastguard Worker std::optional<ConstByteSpan> expected_message,
81*61c4878aSAndroid Build Coastguard Worker uint32_t expected_drop_count,
82*61c4878aSAndroid Build Coastguard Worker uint32_t expected_ingress_drop_count) {
83*61c4878aSAndroid Build Coastguard Worker if (!expected_message.has_value()) {
84*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(Status::OutOfRange(), result.status());
85*61c4878aSAndroid Build Coastguard Worker } else {
86*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(result.status(), OkStatus());
87*61c4878aSAndroid Build Coastguard Worker if (!expected_message.value().empty()) {
88*61c4878aSAndroid Build Coastguard Worker ASSERT_FALSE(result.value().empty());
89*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(result.value().size_bytes(),
90*61c4878aSAndroid Build Coastguard Worker expected_message.value().size_bytes());
91*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(result.value().data(),
92*61c4878aSAndroid Build Coastguard Worker expected_message.value().data(),
93*61c4878aSAndroid Build Coastguard Worker expected_message.value().size_bytes()),
94*61c4878aSAndroid Build Coastguard Worker 0);
95*61c4878aSAndroid Build Coastguard Worker }
96*61c4878aSAndroid Build Coastguard Worker }
97*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(result_drop_count, expected_drop_count);
98*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)101*61c4878aSAndroid Build Coastguard Worker void VerifyPopEntry(Drain& drain,
102*61c4878aSAndroid Build Coastguard Worker std::optional<ConstByteSpan> expected_message,
103*61c4878aSAndroid Build Coastguard Worker uint32_t expected_drop_count,
104*61c4878aSAndroid Build Coastguard Worker uint32_t expected_ingress_drop_count) {
105*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
106*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
107*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> result =
108*61c4878aSAndroid Build Coastguard Worker drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109*61c4878aSAndroid Build Coastguard Worker ExpectMessageAndDropCounts(result,
110*61c4878aSAndroid Build Coastguard Worker drop_count,
111*61c4878aSAndroid Build Coastguard Worker ingress_drop_count,
112*61c4878aSAndroid Build Coastguard Worker expected_message,
113*61c4878aSAndroid Build Coastguard Worker expected_drop_count,
114*61c4878aSAndroid Build Coastguard Worker expected_ingress_drop_count);
115*61c4878aSAndroid Build Coastguard Worker }
116*61c4878aSAndroid Build Coastguard Worker
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)117*61c4878aSAndroid Build Coastguard Worker void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
118*61c4878aSAndroid Build Coastguard Worker uint32_t result_drop_count,
119*61c4878aSAndroid Build Coastguard Worker uint32_t result_ingress_drop_count,
120*61c4878aSAndroid Build Coastguard Worker std::optional<ConstByteSpan> expected_message,
121*61c4878aSAndroid Build Coastguard Worker uint32_t expected_drop_count,
122*61c4878aSAndroid Build Coastguard Worker uint32_t expected_ingress_drop_count) {
123*61c4878aSAndroid Build Coastguard Worker if (peek_result.ok()) {
124*61c4878aSAndroid Build Coastguard Worker ASSERT_FALSE(peek_result.value().entry().empty());
125*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> verify_result(peek_result.value().entry());
126*61c4878aSAndroid Build Coastguard Worker ExpectMessageAndDropCounts(verify_result,
127*61c4878aSAndroid Build Coastguard Worker result_drop_count,
128*61c4878aSAndroid Build Coastguard Worker result_ingress_drop_count,
129*61c4878aSAndroid Build Coastguard Worker expected_message,
130*61c4878aSAndroid Build Coastguard Worker expected_drop_count,
131*61c4878aSAndroid Build Coastguard Worker expected_ingress_drop_count);
132*61c4878aSAndroid Build Coastguard Worker return;
133*61c4878aSAndroid Build Coastguard Worker }
134*61c4878aSAndroid Build Coastguard Worker if (expected_message.has_value()) {
135*61c4878aSAndroid Build Coastguard Worker // Fail since we expected OkStatus.
136*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(peek_result.status(), OkStatus());
137*61c4878aSAndroid Build Coastguard Worker }
138*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(Status::OutOfRange(), peek_result.status());
139*61c4878aSAndroid Build Coastguard Worker }
140*61c4878aSAndroid Build Coastguard Worker
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)141*61c4878aSAndroid Build Coastguard Worker void ExpectNotificationCount(CountingListener& listener,
142*61c4878aSAndroid Build Coastguard Worker size_t expected_notification_count) {
143*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
144*61c4878aSAndroid Build Coastguard Worker listener.ResetNotificationCount();
145*61c4878aSAndroid Build Coastguard Worker }
146*61c4878aSAndroid Build Coastguard Worker
147*61c4878aSAndroid Build Coastguard Worker std::byte buffer_[kBufferSize];
148*61c4878aSAndroid Build Coastguard Worker std::byte entry_buffer_[kEntryBufferSize];
149*61c4878aSAndroid Build Coastguard Worker CountingListener listeners_[kMaxListeners];
150*61c4878aSAndroid Build Coastguard Worker Drain drains_[kMaxDrains];
151*61c4878aSAndroid Build Coastguard Worker MultiSink multisink_;
152*61c4878aSAndroid Build Coastguard Worker };
153*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,SingleDrain)154*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, SingleDrain) {
155*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
156*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[0]);
157*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
158*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
159*61c4878aSAndroid Build Coastguard Worker
160*61c4878aSAndroid Build Coastguard Worker // Single entry push and pop.
161*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
162*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
163*61c4878aSAndroid Build Coastguard Worker // Single empty entry push and pop.
164*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(ConstByteSpan());
165*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
166*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
167*61c4878aSAndroid Build Coastguard Worker
168*61c4878aSAndroid Build Coastguard Worker // Multiple entries with intermittent drops.
169*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
170*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
171*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
172*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 3u);
173*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
174*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
175*61c4878aSAndroid Build Coastguard Worker
176*61c4878aSAndroid Build Coastguard Worker // Send drops only.
177*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
178*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
179*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
180*61c4878aSAndroid Build Coastguard Worker
181*61c4878aSAndroid Build Coastguard Worker // Confirm out-of-range if no entries are expected.
182*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 0u);
183*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
184*61c4878aSAndroid Build Coastguard Worker }
185*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,MultipleDrain)186*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, MultipleDrain) {
187*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
188*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[1]);
189*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[0]);
190*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[1]);
191*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
192*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[1], 1u);
193*61c4878aSAndroid Build Coastguard Worker
194*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
195*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
196*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
197*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
198*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
199*61c4878aSAndroid Build Coastguard Worker
200*61c4878aSAndroid Build Coastguard Worker // Drain one drain entirely.
201*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 5u);
202*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[1], 5u);
203*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
204*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
205*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
206*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
207*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
208*61c4878aSAndroid Build Coastguard Worker
209*61c4878aSAndroid Build Coastguard Worker // Confirm the other drain can be drained separately.
210*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 0u);
211*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[1], 0u);
212*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
213*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
214*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
215*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
216*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
217*61c4878aSAndroid Build Coastguard Worker }
218*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,LateDrainRegistration)219*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, LateDrainRegistration) {
220*61c4878aSAndroid Build Coastguard Worker // Drains attached after entries are pushed should still observe those entries
221*61c4878aSAndroid Build Coastguard Worker // if they have not been evicted from the ring buffer.
222*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
223*61c4878aSAndroid Build Coastguard Worker
224*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
225*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[0]);
226*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
227*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
228*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
229*61c4878aSAndroid Build Coastguard Worker
230*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
231*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
232*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
233*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
234*61c4878aSAndroid Build Coastguard Worker }
235*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,DynamicDrainRegistration)236*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DynamicDrainRegistration) {
237*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
238*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[0]);
239*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
240*61c4878aSAndroid Build Coastguard Worker
241*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
242*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
243*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
244*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
245*61c4878aSAndroid Build Coastguard Worker
246*61c4878aSAndroid Build Coastguard Worker // Drain out one message and detach it.
247*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 4u);
248*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
249*61c4878aSAndroid Build Coastguard Worker multisink_.DetachDrain(drains_[0]);
250*61c4878aSAndroid Build Coastguard Worker multisink_.DetachListener(listeners_[0]);
251*61c4878aSAndroid Build Coastguard Worker
252*61c4878aSAndroid Build Coastguard Worker // Re-attaching the drain should reproduce the last observed message. Note
253*61c4878aSAndroid Build Coastguard Worker // that notifications are not expected, nor are drops observed before the
254*61c4878aSAndroid Build Coastguard Worker // first valid message in the buffer.
255*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
256*61c4878aSAndroid Build Coastguard Worker multisink_.AttachListener(listeners_[0]);
257*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
258*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
259*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
260*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
261*61c4878aSAndroid Build Coastguard Worker
262*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
263*61c4878aSAndroid Build Coastguard Worker ExpectNotificationCount(listeners_[0], 1u);
264*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
265*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
266*61c4878aSAndroid Build Coastguard Worker }
267*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,TooSmallBuffer)268*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, TooSmallBuffer) {
269*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
270*61c4878aSAndroid Build Coastguard Worker
271*61c4878aSAndroid Build Coastguard Worker // Insert an entry and a drop, then try to read into an insufficient buffer.
272*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
273*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
274*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped();
275*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
276*61c4878aSAndroid Build Coastguard Worker
277*61c4878aSAndroid Build Coastguard Worker // Attempting to acquire an entry with a small buffer should result in
278*61c4878aSAndroid Build Coastguard Worker // RESOURCE_EXHAUSTED and remove it.
279*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> result = drains_[0].PopEntry(
280*61c4878aSAndroid Build Coastguard Worker span(entry_buffer_, 1), drop_count, ingress_drop_count);
281*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(result.status(), Status::ResourceExhausted());
282*61c4878aSAndroid Build Coastguard Worker
283*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
284*61c4878aSAndroid Build Coastguard Worker }
285*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,Iterator)286*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, Iterator) {
287*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
288*61c4878aSAndroid Build Coastguard Worker
289*61c4878aSAndroid Build Coastguard Worker // Insert entries and consume them all.
290*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
291*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
292*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
293*61c4878aSAndroid Build Coastguard Worker
294*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
296*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
297*61c4878aSAndroid Build Coastguard Worker
298*61c4878aSAndroid Build Coastguard Worker // Confirm that the iterator still observes the messages in the ring buffer.
299*61c4878aSAndroid Build Coastguard Worker size_t iterated_entries = 0;
300*61c4878aSAndroid Build Coastguard Worker for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
301*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
302*61c4878aSAndroid Build Coastguard Worker iterated_entries++;
303*61c4878aSAndroid Build Coastguard Worker }
304*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(iterated_entries, 3u);
305*61c4878aSAndroid Build Coastguard Worker }
306*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,IteratorNoDrains)307*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IteratorNoDrains) {
308*61c4878aSAndroid Build Coastguard Worker // Insert entries with no drains attached. Even though there are no consumers,
309*61c4878aSAndroid Build Coastguard Worker // iterators should still walk from the oldest entry.
310*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
311*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
312*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
313*61c4878aSAndroid Build Coastguard Worker
314*61c4878aSAndroid Build Coastguard Worker // Confirm that the iterator still observes the messages in the ring buffer.
315*61c4878aSAndroid Build Coastguard Worker size_t iterated_entries = 0;
316*61c4878aSAndroid Build Coastguard Worker for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
317*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
318*61c4878aSAndroid Build Coastguard Worker iterated_entries++;
319*61c4878aSAndroid Build Coastguard Worker }
320*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(iterated_entries, 3u);
321*61c4878aSAndroid Build Coastguard Worker }
322*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,IteratorNoEntries)323*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IteratorNoEntries) {
324*61c4878aSAndroid Build Coastguard Worker // Attach a drain, but don't add any entries.
325*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
326*61c4878aSAndroid Build Coastguard Worker // Confirm that the iterator has no entries.
327*61c4878aSAndroid Build Coastguard Worker MultiSink::UnsafeIterationWrapper unsafe_iterator =
328*61c4878aSAndroid Build Coastguard Worker multisink_.UnsafeIteration();
329*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
330*61c4878aSAndroid Build Coastguard Worker }
331*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,PeekEntryNoEntries)332*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekEntryNoEntries) {
333*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
334*61c4878aSAndroid Build Coastguard Worker
335*61c4878aSAndroid Build Coastguard Worker // Peek empty multisink.
336*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
337*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
338*61c4878aSAndroid Build Coastguard Worker auto peek_result =
339*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
340*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
341*61c4878aSAndroid Build Coastguard Worker }
342*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,PeekAndPop)343*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekAndPop) {
344*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
345*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[1]);
346*61c4878aSAndroid Build Coastguard Worker
347*61c4878aSAndroid Build Coastguard Worker // Peek entry after multisink has some entries.
348*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
349*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessageOther);
350*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
351*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
352*61c4878aSAndroid Build Coastguard Worker auto first_peek_result =
353*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
354*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
355*61c4878aSAndroid Build Coastguard Worker first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
356*61c4878aSAndroid Build Coastguard Worker
357*61c4878aSAndroid Build Coastguard Worker // Multiple peeks must return the front message.
358*61c4878aSAndroid Build Coastguard Worker auto peek_duplicate =
359*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
360*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
361*61c4878aSAndroid Build Coastguard Worker peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
362*61c4878aSAndroid Build Coastguard Worker // A second drain must peek the front message.
363*61c4878aSAndroid Build Coastguard Worker auto peek_other_drain =
364*61c4878aSAndroid Build Coastguard Worker drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
365*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
366*61c4878aSAndroid Build Coastguard Worker peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
367*61c4878aSAndroid Build Coastguard Worker
368*61c4878aSAndroid Build Coastguard Worker // After a drain pops a peeked entry, the next peek call must return the next
369*61c4878aSAndroid Build Coastguard Worker // message.
370*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
371*61c4878aSAndroid Build Coastguard Worker auto second_peek_result =
372*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
373*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
374*61c4878aSAndroid Build Coastguard Worker second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
375*61c4878aSAndroid Build Coastguard Worker // Slower readers must be unchanged.
376*61c4878aSAndroid Build Coastguard Worker auto peek_other_drain_duplicate =
377*61c4878aSAndroid Build Coastguard Worker drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
378*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(peek_other_drain_duplicate,
379*61c4878aSAndroid Build Coastguard Worker drop_count,
380*61c4878aSAndroid Build Coastguard Worker ingress_drop_count,
381*61c4878aSAndroid Build Coastguard Worker kMessage,
382*61c4878aSAndroid Build Coastguard Worker 0,
383*61c4878aSAndroid Build Coastguard Worker 0);
384*61c4878aSAndroid Build Coastguard Worker
385*61c4878aSAndroid Build Coastguard Worker // PopEntry prior to popping the previously peeked entry.
386*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
387*61c4878aSAndroid Build Coastguard Worker // Popping an entry already handled must not trigger errors.
388*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
389*61c4878aSAndroid Build Coastguard Worker // Popping with an old peek context must not trigger errors.
390*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
391*61c4878aSAndroid Build Coastguard Worker
392*61c4878aSAndroid Build Coastguard Worker // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
393*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
394*61c4878aSAndroid Build Coastguard Worker auto empty_peek_result =
395*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
396*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
397*61c4878aSAndroid Build Coastguard Worker empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
398*61c4878aSAndroid Build Coastguard Worker
399*61c4878aSAndroid Build Coastguard Worker // // Slower readers must be unchanged.
400*61c4878aSAndroid Build Coastguard Worker auto peek_other_drain_unchanged =
401*61c4878aSAndroid Build Coastguard Worker drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
402*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(peek_other_drain_unchanged,
403*61c4878aSAndroid Build Coastguard Worker drop_count,
404*61c4878aSAndroid Build Coastguard Worker ingress_drop_count,
405*61c4878aSAndroid Build Coastguard Worker kMessage,
406*61c4878aSAndroid Build Coastguard Worker 0,
407*61c4878aSAndroid Build Coastguard Worker 0);
408*61c4878aSAndroid Build Coastguard Worker }
409*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)410*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
411*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
412*61c4878aSAndroid Build Coastguard Worker
413*61c4878aSAndroid Build Coastguard Worker // Peek entry after multisink has some entries.
414*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
415*61c4878aSAndroid Build Coastguard Worker const uint32_t ingress_drops = 10;
416*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(ingress_drops);
417*61c4878aSAndroid Build Coastguard Worker
418*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
419*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
420*61c4878aSAndroid Build Coastguard Worker auto peek_result1 =
421*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
422*61c4878aSAndroid Build Coastguard Worker // No drops reported until the drain finds a gap in the sequence IDs.
423*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
424*61c4878aSAndroid Build Coastguard Worker peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
425*61c4878aSAndroid Build Coastguard Worker
426*61c4878aSAndroid Build Coastguard Worker // Popping the peeked entry advances the drain, and a new peek will find the
427*61c4878aSAndroid Build Coastguard Worker // gap in sequence IDs.
428*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
429*61c4878aSAndroid Build Coastguard Worker auto peek_result2 =
430*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
431*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
432*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drop_count, 0u);
433*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ingress_drop_count, ingress_drops);
434*61c4878aSAndroid Build Coastguard Worker }
435*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)436*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
437*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
438*61c4878aSAndroid Build Coastguard Worker
439*61c4878aSAndroid Build Coastguard Worker // Add entries until buffer is full and drain has to be advanced.
440*61c4878aSAndroid Build Coastguard Worker // The sequence ID takes 1 byte when less than 128.
441*61c4878aSAndroid Build Coastguard Worker const size_t max_multisink_messages = 128;
442*61c4878aSAndroid Build Coastguard Worker const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
443*61c4878aSAndroid Build Coastguard Worker // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
444*61c4878aSAndroid Build Coastguard Worker const size_t message_size = buffer_entry_size - 2;
445*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, message_size> message;
446*61c4878aSAndroid Build Coastguard Worker std::memset(message.data(), 'a', message.size());
447*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < max_multisink_messages; ++i) {
448*61c4878aSAndroid Build Coastguard Worker message[0] = static_cast<std::byte>(i);
449*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(message);
450*61c4878aSAndroid Build Coastguard Worker }
451*61c4878aSAndroid Build Coastguard Worker
452*61c4878aSAndroid Build Coastguard Worker // At this point the buffer is full, but the sequence ID will take 1 more byte
453*61c4878aSAndroid Build Coastguard Worker // in the preamble, meaning that adding N new entries, drops N + 1 entries.
454*61c4878aSAndroid Build Coastguard Worker // Account for that offset.
455*61c4878aSAndroid Build Coastguard Worker const size_t expected_drops = 5;
456*61c4878aSAndroid Build Coastguard Worker for (size_t i = 1; i < expected_drops; ++i) {
457*61c4878aSAndroid Build Coastguard Worker message[0] = static_cast<std::byte>(200 + i);
458*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(message);
459*61c4878aSAndroid Build Coastguard Worker }
460*61c4878aSAndroid Build Coastguard Worker
461*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
462*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
463*61c4878aSAndroid Build Coastguard Worker
464*61c4878aSAndroid Build Coastguard Worker auto peek_result =
465*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
466*61c4878aSAndroid Build Coastguard Worker // The message peeked is the 6th message added.
467*61c4878aSAndroid Build Coastguard Worker message[0] = static_cast<std::byte>(5);
468*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
469*61c4878aSAndroid Build Coastguard Worker peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
470*61c4878aSAndroid Build Coastguard Worker
471*61c4878aSAndroid Build Coastguard Worker // Add 3 more messages since we peeked the multisink, generating 2 more drops.
472*61c4878aSAndroid Build Coastguard Worker const size_t expected_drops2 = 2;
473*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < expected_drops2 + 1; ++i) {
474*61c4878aSAndroid Build Coastguard Worker message[0] = static_cast<std::byte>(220 + i);
475*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(message);
476*61c4878aSAndroid Build Coastguard Worker }
477*61c4878aSAndroid Build Coastguard Worker
478*61c4878aSAndroid Build Coastguard Worker // Pop the 6th message now, even though it was already dropped.
479*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
480*61c4878aSAndroid Build Coastguard Worker
481*61c4878aSAndroid Build Coastguard Worker // A new peek would get the 9th message because two more messages were
482*61c4878aSAndroid Build Coastguard Worker // dropped. Given that PopEntry() was called with peek_result, all the dropped
483*61c4878aSAndroid Build Coastguard Worker // messages before peek_result should be considered handled and only the two
484*61c4878aSAndroid Build Coastguard Worker // new drops should be reported here.
485*61c4878aSAndroid Build Coastguard Worker auto peek_result2 =
486*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
487*61c4878aSAndroid Build Coastguard Worker message[0] = static_cast<std::byte>(8); // 9th message
488*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(peek_result2,
489*61c4878aSAndroid Build Coastguard Worker drop_count,
490*61c4878aSAndroid Build Coastguard Worker ingress_drop_count,
491*61c4878aSAndroid Build Coastguard Worker message,
492*61c4878aSAndroid Build Coastguard Worker expected_drops2,
493*61c4878aSAndroid Build Coastguard Worker 0);
494*61c4878aSAndroid Build Coastguard Worker }
495*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,IngressDropCountOverflow)496*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IngressDropCountOverflow) {
497*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
498*61c4878aSAndroid Build Coastguard Worker
499*61c4878aSAndroid Build Coastguard Worker // Make drain's last handled drop larger than multisink drop count, which
500*61c4878aSAndroid Build Coastguard Worker // overflowed.
501*61c4878aSAndroid Build Coastguard Worker const uint32_t drop_count_close_to_overflow =
502*61c4878aSAndroid Build Coastguard Worker std::numeric_limits<uint32_t>::max() - 3;
503*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(drop_count_close_to_overflow);
504*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
505*61c4878aSAndroid Build Coastguard Worker
506*61c4878aSAndroid Build Coastguard Worker // Catch up drain's drop count.
507*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
508*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
509*61c4878aSAndroid Build Coastguard Worker auto peek_result1 =
510*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
511*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(peek_result1,
512*61c4878aSAndroid Build Coastguard Worker drop_count,
513*61c4878aSAndroid Build Coastguard Worker ingress_drop_count,
514*61c4878aSAndroid Build Coastguard Worker kMessage,
515*61c4878aSAndroid Build Coastguard Worker 0,
516*61c4878aSAndroid Build Coastguard Worker drop_count_close_to_overflow);
517*61c4878aSAndroid Build Coastguard Worker // Popping the peeked entry advances the drain, and a new peek will find the
518*61c4878aSAndroid Build Coastguard Worker // gap in sequence IDs.
519*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
520*61c4878aSAndroid Build Coastguard Worker
521*61c4878aSAndroid Build Coastguard Worker // Overflow multisink's drop count.
522*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_ingress_drop_count = 10;
523*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(expected_ingress_drop_count);
524*61c4878aSAndroid Build Coastguard Worker
525*61c4878aSAndroid Build Coastguard Worker auto peek_result2 =
526*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
527*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
528*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drop_count, 0u);
529*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
530*61c4878aSAndroid Build Coastguard Worker
531*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
532*61c4878aSAndroid Build Coastguard Worker auto peek_result3 =
533*61c4878aSAndroid Build Coastguard Worker drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
534*61c4878aSAndroid Build Coastguard Worker VerifyPeekResult(
535*61c4878aSAndroid Build Coastguard Worker peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
536*61c4878aSAndroid Build Coastguard Worker }
537*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)538*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
539*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
540*61c4878aSAndroid Build Coastguard Worker
541*61c4878aSAndroid Build Coastguard Worker const uint32_t ingress_drops = 10;
542*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(ingress_drops);
543*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
544*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
545*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
546*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
547*61c4878aSAndroid Build Coastguard Worker
548*61c4878aSAndroid Build Coastguard Worker // Detaching and attaching drain should report the same drops.
549*61c4878aSAndroid Build Coastguard Worker multisink_.DetachDrain(drains_[0]);
550*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
551*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
552*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
553*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
554*61c4878aSAndroid Build Coastguard Worker }
555*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)556*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
557*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[0]);
558*61c4878aSAndroid Build Coastguard Worker
559*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
560*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
561*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
562*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(kMessage);
563*61c4878aSAndroid Build Coastguard Worker const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
564*61c4878aSAndroid Build Coastguard Worker EXPECT_GT(unread_entries_size, 0u);
565*61c4878aSAndroid Build Coastguard Worker
566*61c4878aSAndroid Build Coastguard Worker // Attach another drain, it sees the same unread entriess size as the first
567*61c4878aSAndroid Build Coastguard Worker // drain.
568*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drains_[1]);
569*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
570*61c4878aSAndroid Build Coastguard Worker
571*61c4878aSAndroid Build Coastguard Worker // Pop entries from the first drain.
572*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0],
573*61c4878aSAndroid Build Coastguard Worker /*expected_message=*/kMessage,
574*61c4878aSAndroid Build Coastguard Worker /*expected_drop_count=*/0,
575*61c4878aSAndroid Build Coastguard Worker /*expected_ingress_drop_count=*/0);
576*61c4878aSAndroid Build Coastguard Worker VerifyPopEntry(drains_[0],
577*61c4878aSAndroid Build Coastguard Worker /*expected_message=*/kMessage,
578*61c4878aSAndroid Build Coastguard Worker /*expected_drop_count=*/0,
579*61c4878aSAndroid Build Coastguard Worker /*expected_ingress_drop_count=*/0);
580*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
581*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
582*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
583*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(drains_[1].GetUnreadEntriesCount(), 2u);
584*61c4878aSAndroid Build Coastguard Worker }
585*61c4878aSAndroid Build Coastguard Worker
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)586*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
587*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, 32> buffer;
588*61c4878aSAndroid Build Coastguard Worker MultiSink multisink(buffer);
589*61c4878aSAndroid Build Coastguard Worker
590*61c4878aSAndroid Build Coastguard Worker pw::multisink::MultiSink::Drain drain;
591*61c4878aSAndroid Build Coastguard Worker multisink.AttachDrain(drain);
592*61c4878aSAndroid Build Coastguard Worker
593*61c4878aSAndroid Build Coastguard Worker EntriesSizeMonitorListener listener(drain);
594*61c4878aSAndroid Build Coastguard Worker multisink.AttachListener(listener);
595*61c4878aSAndroid Build Coastguard Worker
596*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
597*61c4878aSAndroid Build Coastguard Worker
598*61c4878aSAndroid Build Coastguard Worker constexpr std::string_view kEntryToPush = "one";
599*61c4878aSAndroid Build Coastguard Worker multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
600*61c4878aSAndroid Build Coastguard Worker
601*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
602*61c4878aSAndroid Build Coastguard Worker drain.GetUnreadEntriesSize());
603*61c4878aSAndroid Build Coastguard Worker }
604*61c4878aSAndroid Build Coastguard Worker
TEST(UnsafeIteration,NoLimit)605*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIteration, NoLimit) {
606*61c4878aSAndroid Build Coastguard Worker constexpr std::array<std::string_view, 5> kExpectedEntries{
607*61c4878aSAndroid Build Coastguard Worker "one", "two", "three", "four", "five"};
608*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, 32> buffer;
609*61c4878aSAndroid Build Coastguard Worker MultiSink multisink(buffer);
610*61c4878aSAndroid Build Coastguard Worker
611*61c4878aSAndroid Build Coastguard Worker for (std::string_view entry : kExpectedEntries) {
612*61c4878aSAndroid Build Coastguard Worker multisink.HandleEntry(as_bytes(span<const char>(entry)));
613*61c4878aSAndroid Build Coastguard Worker }
614*61c4878aSAndroid Build Coastguard Worker
615*61c4878aSAndroid Build Coastguard Worker size_t entry_count = 0;
616*61c4878aSAndroid Build Coastguard Worker struct {
617*61c4878aSAndroid Build Coastguard Worker size_t& entry_count;
618*61c4878aSAndroid Build Coastguard Worker span<const std::string_view> expected_results;
619*61c4878aSAndroid Build Coastguard Worker } ctx{entry_count, kExpectedEntries};
620*61c4878aSAndroid Build Coastguard Worker auto cb = [&ctx](ConstByteSpan data) {
621*61c4878aSAndroid Build Coastguard Worker std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
622*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(data.size(), expected_entry.size());
623*61c4878aSAndroid Build Coastguard Worker const int result =
624*61c4878aSAndroid Build Coastguard Worker memcmp(data.data(), expected_entry.data(), expected_entry.size());
625*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(0, result);
626*61c4878aSAndroid Build Coastguard Worker ctx.entry_count++;
627*61c4878aSAndroid Build Coastguard Worker };
628*61c4878aSAndroid Build Coastguard Worker
629*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
630*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(kExpectedEntries.size(), entry_count);
631*61c4878aSAndroid Build Coastguard Worker }
632*61c4878aSAndroid Build Coastguard Worker
TEST(UnsafeIteration,Subset)633*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIteration, Subset) {
634*61c4878aSAndroid Build Coastguard Worker constexpr std::array<std::string_view, 5> kExpectedEntries{
635*61c4878aSAndroid Build Coastguard Worker "one", "two", "three", "four", "five"};
636*61c4878aSAndroid Build Coastguard Worker constexpr size_t kStartOffset = 3;
637*61c4878aSAndroid Build Coastguard Worker constexpr size_t kExpectedEntriesMaxEntries =
638*61c4878aSAndroid Build Coastguard Worker kExpectedEntries.size() - kStartOffset;
639*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, 32> buffer;
640*61c4878aSAndroid Build Coastguard Worker MultiSink multisink(buffer);
641*61c4878aSAndroid Build Coastguard Worker
642*61c4878aSAndroid Build Coastguard Worker for (std::string_view entry : kExpectedEntries) {
643*61c4878aSAndroid Build Coastguard Worker multisink.HandleEntry(as_bytes(span<const char>(entry)));
644*61c4878aSAndroid Build Coastguard Worker }
645*61c4878aSAndroid Build Coastguard Worker
646*61c4878aSAndroid Build Coastguard Worker size_t entry_count = 0;
647*61c4878aSAndroid Build Coastguard Worker struct {
648*61c4878aSAndroid Build Coastguard Worker size_t& entry_count;
649*61c4878aSAndroid Build Coastguard Worker span<const std::string_view> expected_results;
650*61c4878aSAndroid Build Coastguard Worker } ctx{entry_count, kExpectedEntries};
651*61c4878aSAndroid Build Coastguard Worker auto cb = [&ctx](ConstByteSpan data) {
652*61c4878aSAndroid Build Coastguard Worker std::string_view expected_entry =
653*61c4878aSAndroid Build Coastguard Worker ctx.expected_results[ctx.entry_count + kStartOffset];
654*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(data.size(), expected_entry.size());
655*61c4878aSAndroid Build Coastguard Worker const int result =
656*61c4878aSAndroid Build Coastguard Worker memcmp(data.data(), expected_entry.data(), expected_entry.size());
657*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(0, result);
658*61c4878aSAndroid Build Coastguard Worker ctx.entry_count++;
659*61c4878aSAndroid Build Coastguard Worker };
660*61c4878aSAndroid Build Coastguard Worker
661*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(
662*61c4878aSAndroid Build Coastguard Worker OkStatus(),
663*61c4878aSAndroid Build Coastguard Worker multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
664*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
665*61c4878aSAndroid Build Coastguard Worker }
666*61c4878aSAndroid Build Coastguard Worker
TEST(UnsafeIterationFromEnd,NoTruncation)667*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIterationFromEnd, NoTruncation) {
668*61c4878aSAndroid Build Coastguard Worker constexpr std::array<std::string_view, 5> kExpectedEntries{
669*61c4878aSAndroid Build Coastguard Worker "one", "two", "three", "four", "five"};
670*61c4878aSAndroid Build Coastguard Worker constexpr size_t buffer_size = 32;
671*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, buffer_size> buffer;
672*61c4878aSAndroid Build Coastguard Worker MultiSink multisink(buffer);
673*61c4878aSAndroid Build Coastguard Worker
674*61c4878aSAndroid Build Coastguard Worker for (std::string_view entry : kExpectedEntries) {
675*61c4878aSAndroid Build Coastguard Worker multisink.HandleEntry(as_bytes(span<const char>(entry)));
676*61c4878aSAndroid Build Coastguard Worker }
677*61c4878aSAndroid Build Coastguard Worker
678*61c4878aSAndroid Build Coastguard Worker size_t entry_count = 0;
679*61c4878aSAndroid Build Coastguard Worker struct {
680*61c4878aSAndroid Build Coastguard Worker size_t& entry_count;
681*61c4878aSAndroid Build Coastguard Worker span<const std::string_view> expected_results;
682*61c4878aSAndroid Build Coastguard Worker } ctx{entry_count, kExpectedEntries};
683*61c4878aSAndroid Build Coastguard Worker auto cb = [&ctx](ConstByteSpan data) {
684*61c4878aSAndroid Build Coastguard Worker std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
685*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(data.size(), expected_entry.size());
686*61c4878aSAndroid Build Coastguard Worker const int result =
687*61c4878aSAndroid Build Coastguard Worker memcmp(data.data(), expected_entry.data(), expected_entry.size());
688*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(0, result);
689*61c4878aSAndroid Build Coastguard Worker ctx.entry_count++;
690*61c4878aSAndroid Build Coastguard Worker };
691*61c4878aSAndroid Build Coastguard Worker
692*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntryFromEnd(cb, buffer_size));
693*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(kExpectedEntries.size(), entry_count);
694*61c4878aSAndroid Build Coastguard Worker }
695*61c4878aSAndroid Build Coastguard Worker
TEST(UnsafeIterationFromEnd,Truncation)696*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIterationFromEnd, Truncation) {
697*61c4878aSAndroid Build Coastguard Worker constexpr std::array<std::string_view, 5> kEntries{
698*61c4878aSAndroid Build Coastguard Worker "one", "two", "three", "four", "five"};
699*61c4878aSAndroid Build Coastguard Worker constexpr std::array<std::string_view, 3> kExpectedEntries{
700*61c4878aSAndroid Build Coastguard Worker "three", "four", "five"};
701*61c4878aSAndroid Build Coastguard Worker constexpr size_t buffer_size = 32;
702*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, buffer_size> buffer;
703*61c4878aSAndroid Build Coastguard Worker MultiSink multisink(buffer);
704*61c4878aSAndroid Build Coastguard Worker
705*61c4878aSAndroid Build Coastguard Worker for (std::string_view entry : kEntries) {
706*61c4878aSAndroid Build Coastguard Worker multisink.HandleEntry(as_bytes(span<const char>(entry)));
707*61c4878aSAndroid Build Coastguard Worker }
708*61c4878aSAndroid Build Coastguard Worker
709*61c4878aSAndroid Build Coastguard Worker size_t entry_count = 0;
710*61c4878aSAndroid Build Coastguard Worker struct {
711*61c4878aSAndroid Build Coastguard Worker size_t& entry_count;
712*61c4878aSAndroid Build Coastguard Worker span<const std::string_view> expected_results;
713*61c4878aSAndroid Build Coastguard Worker } ctx{entry_count, kExpectedEntries};
714*61c4878aSAndroid Build Coastguard Worker auto cb = [&ctx](ConstByteSpan data) {
715*61c4878aSAndroid Build Coastguard Worker std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
716*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(data.size(), expected_entry.size());
717*61c4878aSAndroid Build Coastguard Worker const int result =
718*61c4878aSAndroid Build Coastguard Worker memcmp(data.data(), expected_entry.data(), expected_entry.size());
719*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(0, result);
720*61c4878aSAndroid Build Coastguard Worker ctx.entry_count++;
721*61c4878aSAndroid Build Coastguard Worker };
722*61c4878aSAndroid Build Coastguard Worker
723*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(OkStatus(),
724*61c4878aSAndroid Build Coastguard Worker multisink.UnsafeForEachEntryFromEnd(cb, buffer_size / 2));
725*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(kExpectedEntries.size(), entry_count);
726*61c4878aSAndroid Build Coastguard Worker }
727*61c4878aSAndroid Build Coastguard Worker
728*61c4878aSAndroid Build Coastguard Worker } // namespace
729*61c4878aSAndroid Build Coastguard Worker } // namespace pw::multisink
730